Completed
Push — master ( 04e536...b701b2 )
by
unknown
18s queued 10s
created

PhpAmqpLib/Connection/AbstractConnection.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AbstractChannel;
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
7
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
8
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
9
use PhpAmqpLib\Exception\AMQPIOException;
10
use PhpAmqpLib\Exception\AMQPNoDataException;
11
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
12
use PhpAmqpLib\Exception\AMQPRuntimeException;
13
use PhpAmqpLib\Exception\AMQPSocketException;
14
use PhpAmqpLib\Exception\AMQPTimeoutException;
15
use PhpAmqpLib\Wire\AMQPReader;
16
use PhpAmqpLib\Wire\AMQPTable;
17
use PhpAmqpLib\Wire\AMQPWriter;
18
use PhpAmqpLib\Wire\IO\AbstractIO;
19
20
class AbstractConnection extends AbstractChannel
21
{
22
    /**
23
     * @var array
24
     * @internal
25
     */
26
    public static $LIBRARY_PROPERTIES = array(
27
        'product' => array('S', 'AMQPLib'),
28
        'platform' => array('S', 'PHP'),
29
        'version' => array('S', '2.9'),
30
        'information' => array('S', ''),
31
        'copyright' => array('S', ''),
32
        'capabilities' => array(
33
            'F',
34
            array(
35
                'authentication_failure_close' => array('t', true),
36
                'publisher_confirms' => array('t', true),
37
                'consumer_cancel_notify' => array('t', true),
38
                'exchange_exchange_bindings' => array('t', true),
39
                'basic.nack' => array('t', true),
40
                'connection.blocked' => array('t', true)
41
            )
42
        )
43
    );
44
45
    /**
46
     * @var AMQPChannel[]
47
     * @internal
48
     */
49
    public $channels = array();
50
51
    /** @var int */
52
    protected $version_major;
53
54
    /** @var int */
55
    protected $version_minor;
56
57
    /** @var array */
58
    protected $server_properties;
59
60
    /** @var array */
61
    protected $mechanisms;
62
63
    /** @var array */
64
    protected $locales;
65
66
    /** @var bool */
67
    protected $wait_tune_ok;
68
69
    /** @var string */
70
    protected $known_hosts;
71
72
    /** @var AMQPReader */
73
    protected $input;
74
75
    /** @var string */
76
    protected $vhost;
77
78
    /** @var bool */
79
    protected $insist;
80
81
    /** @var string */
82
    protected $login_method;
83
84
    /** @var string */
85
    protected $login_response;
86
87
    /** @var string */
88
    protected $locale;
89
90
    /** @var int */
91
    protected $heartbeat;
92
93
    /** @var float */
94
    protected $last_frame;
95
96
    /** @var int */
97
    protected $channel_max = 65535;
98
99
    /** @var int */
100
    protected $frame_max = 131072;
101
102
     /** @var array Constructor parameters for clone */
103
    protected $construct_params;
104
105
    /** @var bool Close the connection in destructor */
106
    protected $close_on_destruct = true;
107
108
    /** @var bool Maintain connection status */
109
    protected $is_connected = false;
110
111
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
112
    protected $io;
113
114
    /** @var \PhpAmqpLib\Wire\AMQPReader */
115
    protected $wait_frame_reader;
116
117
    /** @var callable Handles connection blocking from the server */
118
    private $connection_block_handler;
119
120
    /** @var callable Handles connection unblocking from the server */
121
    private $connection_unblock_handler;
122
123
    /** @var int Connection timeout value*/
124
    protected $connection_timeout ;
125
126
    /**
127
     * Circular buffer to speed up prepare_content().
128
     * Max size limited by $prepare_content_cache_max_size.
129
     *
130
     * @var array
131
     * @see prepare_content()
132
     */
133
    private $prepare_content_cache = array();
134
135
    /** @var int Maximal size of $prepare_content_cache */
136
    private $prepare_content_cache_max_size = 100;
137
138
    /**
139
     * Maximum time to wait for channel operations, in seconds
140
     * @var float $channel_rpc_timeout
141
     */
142
    private $channel_rpc_timeout;
143
144
    /**
145
     * @param string $user
146
     * @param string $password
147
     * @param string $vhost
148
     * @param bool $insist
149
     * @param string $login_method
150
     * @param null $login_response
151
     * @param string $locale
152
     * @param AbstractIO $io
153
     * @param int $heartbeat
154
     * @param int $connection_timeout
155
     * @param float $channel_rpc_timeout
156
     * @throws \Exception
157
     */
158 186
    public function __construct(
159
        $user,
160
        $password,
161
        $vhost = '/',
162
        $insist = false,
163
        $login_method = 'AMQPLAIN',
164
        $login_response = null,
165
        $locale = 'en_US',
166
        AbstractIO $io,
167
        $heartbeat = 0,
168
        $connection_timeout = 0,
169
        $channel_rpc_timeout = 0.0
170
    ) {
171
        // save the params for the use of __clone
172 186
        $this->construct_params = func_get_args();
173
174 186
        $this->wait_frame_reader = new AMQPReader(null);
175 186
        $this->vhost = $vhost;
176 186
        $this->insist = $insist;
177 186
        $this->login_method = $login_method;
178 186
        $this->login_response = $login_response;
179 186
        $this->locale = $locale;
180 186
        $this->io = $io;
181 186
        $this->heartbeat = $heartbeat;
182 186
        $this->connection_timeout = $connection_timeout;
183 186
        $this->channel_rpc_timeout = $channel_rpc_timeout;
184
185 186
        if ($user && $password) {
186 186
            $this->login_response = new AMQPWriter();
187 186
            $this->login_response->write_table(array(
188 186
                'LOGIN' => array('S', $user),
189 186
                'PASSWORD' => array('S', $password)
190 93
            ));
191
192
            // Skip the length
193 186
            $responseValue = $this->login_response->getvalue();
194 186
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
195
196 93
        } else {
197
            $this->login_response = null;
198
        }
199
200
        // Lazy Connection waits on connecting
201 186
        if ($this->connectOnConstruct()) {
202 162
            $this->connect();
203 78
        }
204 180
    }
205
206
    /**
207
     * Connects to the AMQP server
208
     */
209 186
    protected function connect()
210
    {
211
        try {
212
            // Loop until we connect
213 186
            while (!$this->isConnected()) {
214
                // Assume we will connect, until we dont
215 186
                $this->setIsConnected(true);
216
217
                // Connect the socket
218 186
                $this->io->connect();
219
220 180
                $this->channels = array();
221
                // The connection object itself is treated as channel 0
222 180
                parent::__construct($this, 0);
223
224 180
                $this->input = new AMQPReader(null, $this->io);
225
226 180
                $this->write($this->amqp_protocol_header);
227 180
                // assume frame was sent successfully, used in $this->wait_channel()
228 180
                $this->last_frame = microtime(true);
229 180
                $this->wait(array($this->waitHelper->get_wait('connection.start')),false,$this->connection_timeout);
230 180
                $this->x_start_ok(
231 180
                    $this->getLibraryProperties(),
232 180
                    $this->login_method,
233 90
                    $this->login_response,
234
                    $this->locale
235 180
                );
236 180
237 180
                $this->wait_tune_ok = true;
238 180
                while ($this->wait_tune_ok) {
239 180
                    $this->wait(array(
240 90
                        $this->waitHelper->get_wait('connection.secure'),
241 90
                        $this->waitHelper->get_wait('connection.tune')
242
                    ));
243 180
                }
244 180
245
                $host = $this->x_open($this->vhost, '', $this->insist);
246 180
                if (!$host) {
247 180
                    //Reconnected
248
                    $this->io->reenableHeartbeat();
249
                    return null; // we weren't redirected
250
                }
251
252
                $this->setIsConnected(false);
253
                $this->closeChannels();
254
255
                // we were redirected, close the socket, loop and try again
256
                $this->close_socket();
257 18
            }
258
259 6
        } catch (\Exception $e) {
260 6
            // Something went wrong, set the connection status
261 6
            $this->setIsConnected(false);
262 6
            $this->closeChannels();
263 6
            $this->close_input();
264
            $this->close_socket();
265 24
            throw $e; // Rethrow exception
266
        }
267
    }
268
269
    /**
270
     * Reconnects using the original connection settings.
271 36
     * This will not recreate any channels that were established previously
272
     */
273
    public function reconnect()
274 36
    {
275
        // Try to close the AMQP connection
276 36
        $this->safeClose();
277 36
        // Reconnect the socket/stream then AMQP
278 36
        $this->io->close();
279 36
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
280
        $this->connect();
281
    }
282
283
    /**
284
     * Cloning will use the old properties to make a new connection to the same server
285
     */
286
    public function __clone()
287
    {
288
        call_user_func_array(array($this, '__construct'), $this->construct_params);
289 12
    }
290
291 12
    public function __destruct()
292 12
    {
293 6
        if ($this->close_on_destruct) {
294 12
            $this->safeClose();
295
        }
296
    }
297
298
    /**
299 48
     * Attempts to close the connection safely
300
     */
301
    protected function safeClose()
302 48
    {
303 33
        try {
304 9
            if (isset($this->input) && $this->input) {
305 24
                $this->close();
306
            }
307
        } catch (\Exception $e) {
308 48
            // Nothing here
309
        }
310
    }
311
312
    /**
313
     * @param int $sec
314
     * @param int $usec
315
     * @return mixed
316
     */
317 View Code Duplication
    public function select($sec, $usec = 0)
318
    {
319
        try {
320
            return $this->io->select($sec, $usec);
321
        } catch (AMQPConnectionClosedException $e) {
322
            $this->do_close();
323
            throw $e;
324
        } catch (AMQPRuntimeException $e) {
325
            $this->setIsConnected(false);
326
            throw $e;
327
        }
328
    }
329
330
    /**
331
     * Allows to not close the connection
332
     * it's useful after the fork when you don't want to close parent process connection
333
     *
334
     * @param bool $close
335
     */
336
    public function set_close_on_destruct($close = true)
337
    {
338
        $this->close_on_destruct = (bool) $close;
339 150
    }
340
341 150
    protected function close_input()
342
    {
343 150
        $this->debug && $this->debug->debug_msg('closing input');
344 144
345 144
        if (!is_null($this->input)) {
346 72
            $this->input->close();
347 150
            $this->input = null;
348
        }
349 150
    }
350
351 150
    protected function close_socket()
352
    {
353 150
        $this->debug && $this->debug->debug_msg('closing socket');
354 150
355 75
        if ($this->io) {
356 150
            $this->io->close();
357
        }
358
    }
359
360
    /**
361 180
     * @param string $data
362
     */
363 180 View Code Duplication
    public function write($data)
364
    {
365
        $this->debug->debug_hexdump($data);
366 180
367 90
        try {
368
            $this->io->write($data);
369
        } catch (AMQPConnectionClosedException $e) {
370
            $this->do_close();
371
            throw $e;
372
        } catch (AMQPRuntimeException $e) {
373
            $this->setIsConnected(false);
374 180
            throw $e;
375
        }
376 144
    }
377
378 144
    protected function do_close()
379 144
    {
380 144
        $this->setIsConnected(false);
381 144
        $this->close_input();
382
        $this->close_socket();
383
    }
384
385
    /**
386
     * @return int
387 174
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
388
     */
389 174
    public function get_free_channel_id()
390 174
    {
391 174
        for ($i = 1; $i <= $this->channel_max; $i++) {
392
            if (!isset($this->channels[$i])) {
393 9
                return $i;
394
            }
395
        }
396
397
        throw new AMQPRuntimeException('No free channel ids');
398
    }
399
400
    /**
401
     * @param string $channel
402
     * @param int $class_id
403
     * @param int $weight
404
     * @param int $body_size
405
     * @param string $packed_properties
406
     * @param string $body
407 66
     * @param AMQPWriter $pkt
408
     */
409 66
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
410 66
    {
411 66
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
412
        $this->write($pkt->getvalue());
413
    }
414
415
    /**
416
     * Returns a new AMQPWriter or mutates the provided $pkt
417
     *
418
     * @param string $channel
419
     * @param int $class_id
420
     * @param int $weight
421
     * @param int $body_size
422
     * @param string $packed_properties
423
     * @param string $body
424
     * @param AMQPWriter $pkt
425 66
     * @return AMQPWriter
426
     */
427 66
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
428
    {
429
        $pkt = $pkt ?: new AMQPWriter();
430 66
431 66
        // Content already prepared ?
432 66
        $key_cache = sprintf(
433 66
            '%s|%s|%s|%s',
434 66
            $channel,
435 33
            $packed_properties,
436 33
            $class_id,
437
            $weight
438 66
        );
439 66
440 66
        if (!isset($this->prepare_content_cache[$key_cache])) {
441 66
            $w = new AMQPWriter();
442 66
            $w->write_octet(2);
443 66
            $w->write_short($channel);
444 66
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
445 66
            $w->write_short($class_id);
446 66
            $w->write_short($weight);
447
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
448
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
449
                reset($this->prepare_content_cache);
450
                $old_key = key($this->prepare_content_cache);
451 33
                unset($this->prepare_content_cache[$old_key]);
452 66
            }
453
        }
454 66
        $pkt->write($this->prepare_content_cache[$key_cache]);
455 66
456
        $pkt->write_longlong($body_size);
457 66
        $pkt->write($packed_properties);
458
459
        $pkt->write_octet(0xCE);
460
461
462
        // memory efficiency: walk the string instead of biting
463 66
        // it. good for very large packets (close in size to
464 66
        // memory_limit setting)
465 66
        $position = 0;
466 60
        $bodyLength = mb_strlen($body,'ASCII');
467 60
        while ($position < $bodyLength) {
468
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
469 60
            $position += $this->frame_max - 8;
470 60
471 60
            $pkt->write_octet(3);
472
            $pkt->write_short($channel);
473 60
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
474
475 60
            $pkt->write($payload);
476 30
477
            $pkt->write_octet(0xCE);
478 66
        }
479
480
        return $pkt;
481
    }
482
483
    /**
484
     * @param string $channel
485
     * @param array $method_sig
486
     * @param AMQPWriter|string $args
487 180
     * @param null $pkt
488
     */
489 180
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
490 180
    {
491 180
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
492 180
        $this->write($pkt->getvalue());
493
        $this->debug->debug_method_signature1($method_sig);
494
    }
495
496
    /**
497
     * Returns a new AMQPWriter or mutates the provided $pkt
498
     *
499
     * @param string $channel
500
     * @param array $method_sig
501
     * @param AMQPWriter|string $args
502
     * @param AMQPWriter $pkt
503 180
     * @return AMQPWriter
504
     */
505 180
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
506 180
    {
507 90
        if ($args instanceof AMQPWriter) {
508
            $args = $args->getvalue();
509 180
        }
510
511 180
        $pkt = $pkt ?: new AMQPWriter();
512 180
513 180
        $pkt->write_octet(1);
514
        $pkt->write_short($channel);
515
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
516 180
        // in payload
517 180
518 180
        $pkt->write_short($method_sig[0]); // class_id
519
        $pkt->write_short($method_sig[1]); // method_id
520 180
        $pkt->write($args);
521
522 180
        $pkt->write_octet(0xCE);
523
524 180
        $this->debug->debug_method_signature1($method_sig);
525
526
        return $pkt;
527
    }
528
529
    /**
530
     * Waits for a frame from the server
531
     *
532
     * @param int|float|null $timeout
533
     * @return array
534
     * @throws \Exception
535
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
536 180
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
537
     */
538 180
    protected function wait_frame($timeout = 0)
539 90
    {
540
        if (is_null($this->input))
541
        {
542
            $this->setIsConnected(false);
543
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
544 180
        }
545 180
546
        $currentTimeout = $this->input->getTimeout();
547
        $this->input->setTimeout($timeout);
548
549 180
        try {
550 180
            // frame_type + channel_id + size
551 90
            $this->wait_frame_reader->reuse(
552
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
553 180
            );
554 180
555 180
            $frame_type = $this->wait_frame_reader->read_octet();
556
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...ROTOCOL_CONSTANTS_CLASS has been deprecated.

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
557
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
558 180
                throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
559 180
            }
560
            $channel = $this->wait_frame_reader->read_short();
561
            $size = $this->wait_frame_reader->read_long();
562 180
563
            // payload + ch
564 180
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
565 180
566
            $payload = $this->wait_frame_reader->read($size);
567 129
            $ch = $this->wait_frame_reader->read_octet();
568 30
569 30
        } catch (AMQPTimeoutException $e) {
570 48
            $this->input->setTimeout($currentTimeout);
571 48
            throw $e;
572 48
        } catch (AMQPNoDataException $e) {
573 24
            if ($this->input) {
574 48
                $this->input->setTimeout($currentTimeout);
575
            }
576
            throw $e;
577
        } catch (AMQPConnectionClosedException $exception) {
578
            $this->do_close();
579
            throw $exception;
580 180
        }
581
582 180
        $this->input->setTimeout($currentTimeout);
583
584
        if ($ch != 0xCE) {
585
            throw new AMQPInvalidFrameException(sprintf(
586
                'Framing error, unexpected byte: %x',
587
                $ch
588
            ));
589 180
        }
590
591
        return array($frame_type, $channel, $payload);
592
    }
593
594
    /**
595
     * Waits for a frame from the server destined for a particular channel.
596
     *
597
     * @param string $channel_id
598
     * @param int|float|null $timeout
599 180
     * @return array
600
     */
601
    protected function wait_channel($channel_id, $timeout = 0)
602 180
    {
603 180
        // Keeping the original timeout unchanged.
604 180
        $_timeout = $timeout;
605
        while (true) {
606 180
            $start = microtime(true);
607 129
            try {
608 30
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
609
            } catch (AMQPTimeoutException $e) {
610
                if ($this->heartbeat && $this->last_frame && microtime(true) - ($this->heartbeat * 2) > $this->last_frame) {
611
                    $this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
612
                    $this->setIsConnected(false);
613
                    throw new AMQPHeartbeatMissedException("Missed server heartbeat");
614 30
                }
615
616
                throw $e;
617 180
            }
618
619 180
            $this->last_frame = microtime(true);
620
621
            if ($frame_channel === 0 && $frame_type === 8) {
622
                // skip heartbeat frames and reduce the timeout by the time passed
623
                $this->debug->debug_msg("received server heartbeat");
624
                if($_timeout > 0) {
625
                    $_timeout -= $this->last_frame - $start;
626
                    if($_timeout <= 0) {
627
                        // If timeout has been reached, throw the exception without calling wait_frame
628
                        throw new AMQPTimeoutException("Timeout waiting on channel");
629
                    }
630
                }
631
                continue;
632
633 180
            }
634 180
635
            if ($frame_channel == $channel_id) {
636
                return array($frame_type, $payload);
637
            }
638
639
            // Not the channel we were looking for.  Queue this frame
640
            //for later, when the other channel is looking for frames.
641 6
            // Make sure the channel still exists, it could have been
642 6
            // closed by a previous Exception.
643 3
            if (isset($this->channels[$frame_channel])) {
644
                array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
645
            }
646
647
            // If we just queued up a method for channel 0 (the Connection
648 6
            // itself) it's probably a close method in reaction to some
649
            // error, so deal with it right away.
650
            if ($frame_type == 1 && $frame_channel == 0) {
651
                $this->wait();
652 3
            }
653
        }
654
    }
655
656
    /**
657
     * Fetches a channel object identified by the numeric channel_id, or
658
     * create that object if it doesn't already exist.
659
     *
660
     * @param int $channel_id
661
     * @return AMQPChannel
662 174
     */
663
    public function channel($channel_id = null)
664 174
    {
665
        if (isset($this->channels[$channel_id])) {
666
            return $this->channels[$channel_id];
667
        }
668 174
669 174
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
670 174
        $ch = new AMQPChannel($this->connection, $channel_id, true, $this->channel_rpc_timeout);
671
        $this->channels[$channel_id] = $ch;
672 174
673
        return $ch;
674
    }
675
676
    /**
677
     * Requests a connection close
678
     *
679
     * @param int $reply_code
680
     * @param string $reply_text
681
     * @param array $method_sig
682
     * @return mixed|null
683 144
     */
684
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
685 144
    {
686 144
        $result = null;
687 144
        $this->io->disableHeartbeat();
688 18
        if (empty($this->protocolWriter) || !$this->isConnected()) {
689
            return $result;
690
        }
691
692 144
        try {
693 144
            $this->closeChannels();
694 144
            list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
695 144
                $reply_code,
696 144
                $reply_text,
697 144
                $method_sig[0],
698 72
                $method_sig[1]
699 144
            );
700 144
            $this->send_method_frame(array($class_id, $method_id), $args);
701 144
            $result = $this->wait(
702 144
                array($this->waitHelper->get_wait('connection.close_ok')),
703 144
                false,
704 72
                $this->connection_timeout
705 72
            );
706
        } catch (\Exception $exception) {
707
            $this->do_close();
708
            throw $exception;
709
        }
710 144
711
        $this->setIsConnected(false);
712 144
713
        return $result;
714
    }
715
716
    /**
717
     * @param AMQPReader $reader
718
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
719
     */
720
    protected function connection_close(AMQPReader $reader)
721
    {
722
        $reply_code = $reader->read_short();
723
        $reply_text = $reader->read_shortstr();
724
        $class_id = $reader->read_short();
725
        $method_id = $reader->read_short();
726
727
        $this->x_close_ok();
728
729
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
730
    }
731
732
    /**
733
     * Confirms a connection close
734
     */
735
    protected function x_close_ok()
736
    {
737
        $this->send_method_frame(
738
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
739
        );
740
        $this->do_close();
741
    }
742
743
    /**
744
     * Confirm a connection close
745
     *
746
     * @param AMQPReader $args
747 144
     */
748
    protected function connection_close_ok($args)
749 144
    {
750 144
        $this->do_close();
751
    }
752
753
    /**
754
     * @param string $virtual_host
755
     * @param string $capabilities
756
     * @param bool $insist
757
     * @return mixed
758 180
     */
759
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
760 180
    {
761 180
        $args = new AMQPWriter();
762 180
        $args->write_shortstr($virtual_host);
763 180
        $args->write_shortstr($capabilities);
764 180
        $args->write_bits(array($insist));
765
        $this->send_method_frame(array(10, 40), $args);
766
767 180
        $wait = array(
768 90
            $this->waitHelper->get_wait('connection.open_ok')
769
        );
770 180
771
        if ($this->protocolVersion == '0.8') {
772
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
773
        }
774 180
775
        return $this->wait($wait);
776
    }
777
778
    /**
779
     * Signals that the connection is ready
780
     *
781
     * @param AMQPReader $args
782 180
     */
783
    protected function connection_open_ok($args)
784 180
    {
785 180
        $this->known_hosts = $args->read_shortstr();
786 180
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
787
    }
788
789
    /**
790
     * Asks the client to use a different server
791
     *
792
     * @param AMQPReader $args
793
     * @return string
794
     */
795
    protected function connection_redirect($args)
796
    {
797
        $host = $args->read_shortstr();
798
        $this->known_hosts = $args->read_shortstr();
799
        $this->debug->debug_msg(sprintf(
800
                'Redirected to [%s], known_hosts [%s]',
801
                $host,
802
                $this->known_hosts
803
            ));
804
805
        return $host;
806
    }
807
808
    /**
809
     * Security mechanism challenge
810
     *
811
     * @param AMQPReader $args
812
     */
813
    protected function connection_secure($args)
814
    {
815
        $challenge = $args->read_longstr();
816
    }
817
818
    /**
819
     * Security mechanism response
820
     *
821
     * @param string $response
822
     */
823
    protected function x_secure_ok($response)
824
    {
825
        $args = new AMQPWriter();
826
        $args->write_longstr($response);
827
        $this->send_method_frame(array(10, 21), $args);
828
    }
829
830
    /**
831
     * Starts connection negotiation
832
     *
833
     * @param AMQPReader $args
834 180
     */
835
    protected function connection_start($args)
836 180
    {
837 180
        $this->version_major = $args->read_octet();
838 180
        $this->version_minor = $args->read_octet();
839 180
        $this->server_properties = $args->read_table();
840 180
        $this->mechanisms = explode(' ', $args->read_longstr());
841
        $this->locales = explode(' ', $args->read_longstr());
842 180
843 180
        $this->debug->debug_connection_start(
844 180
            $this->version_major,
845 180
            $this->version_minor,
846 180
            $this->server_properties,
847 180
            $this->mechanisms,
848 90
            $this->locales
849 180
        );
850
    }
851
852
    /**
853
     * @param AMQPTable|array $clientProperties
854
     * @param string $mechanism
855
     * @param string $response
856
     * @param string $locale
857 180
     */
858
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
859 180
    {
860 180
        $args = new AMQPWriter();
861 180
        $args->write_table($clientProperties);
862 180
        $args->write_shortstr($mechanism);
863 180
        $args->write_longstr($response);
864 180
        $args->write_shortstr($locale);
865 180
        $this->send_method_frame(array(10, 11), $args);
866
    }
867
868
    /**
869
     * Proposes connection tuning parameters
870
     *
871
     * @param AMQPReader $args
872 180
     */
873
    protected function connection_tune($args)
874 180
    {
875 180
        $v = $args->read_short();
876
        if ($v) {
877
            $this->channel_max = $v;
878
        }
879 180
880 180
        $v = $args->read_long();
881 180
        if ($v) {
882 90
            $this->frame_max = $v;
883
        }
884
885 180
        // use server proposed value if not set
886
        if ($this->heartbeat === null) {
887
            $this->heartbeat = $args->read_short();
888
        }
889 180
890 180
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
891
    }
892
893
    /**
894
     * Negotiates connection tuning parameters
895
     *
896
     * @param int $channel_max
897
     * @param int $frame_max
898
     * @param int $heartbeat
899 180
     */
900
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
901 180
    {
902 180
        $args = new AMQPWriter();
903 180
        $args->write_short($channel_max);
904 180
        $args->write_long($frame_max);
905 180
        $args->write_short($heartbeat);
906 180
        $this->send_method_frame(array(10, 31), $args);
907 180
        $this->wait_tune_ok = false;
908
    }
909
910
    /**
911
     * @return resource
912
     * @deprecated No direct access to communication socket should be available.
913
     */
914
    public function getSocket()
915
    {
916
        return $this->io->getSocket();
917
    }
918
919
    /**
920
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
921
     * @deprecated
922
     */
923
    public function getIO()
924
    {
925
        return $this->io;
926
    }
927
928
    /**
929
     * Check connection heartbeat if enabled.
930
     * @throws AMQPHeartbeatMissedException If too much time passed since last connection activity.
931
     * @throws AMQPConnectionClosedException If connection was closed due to network issues or timeouts.
932
     * @throws AMQPSocketException If connection was already closed.
933
     * @throws AMQPTimeoutException If heartbeat write takes too much time.
934
     * @throws AMQPIOException If other connection problems occurred.
935
     */
936
    public function checkHeartBeat()
937
    {
938
        $this->io->check_heartbeat();
939
    }
940
941
    /**
942
     * Handles connection blocked notifications
943
     *
944
     * @param AMQPReader $args
945
     */
946
    protected function connection_blocked(AMQPReader $args)
947
    {
948
        // Call the block handler and pass in the reason
949
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
950
    }
951
952
    /**
953
     * Handles connection unblocked notifications
954
     *
955
     * @param AMQPReader $args
956
     */
957
    protected function connection_unblocked(AMQPReader $args)
958
    {
959
        // No args to an unblock event
960
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
961
    }
962
963
    /**
964
     * Sets a handler which is called whenever a connection.block is sent from the server
965
     *
966
     * @param callable $callback
967
     */
968
    public function set_connection_block_handler($callback)
969
    {
970
        $this->connection_block_handler = $callback;
971
    }
972
973
    /**
974
     * Sets a handler which is called whenever a connection.block is sent from the server
975
     *
976
     * @param callable $callback
977
     */
978
    public function set_connection_unblock_handler($callback)
979
    {
980
        $this->connection_unblock_handler = $callback;
981
    }
982
983
    /**
984
     * Gets the connection status
985
     *
986
     * @return bool
987 186
     */
988
    public function isConnected()
989 186
    {
990
        return (bool) $this->is_connected;
991
    }
992
993
    /**
994
     * Set the connection status
995
     *
996
     * @param bool $is_connected
997 186
     */
998
    protected function setIsConnected($is_connected)
999 186
    {
1000 186
        $this->is_connected = (bool) $is_connected;
1001
    }
1002
1003
    /**
1004
     * Closes all available channels
1005 150
     */
1006
    protected function closeChannels()
1007 150
    {
1008
        foreach ($this->channels as $key => $channel) {
1009 144
            // channels[0] is this connection object, so don't close it yet
1010 144
            if ($key === 0) {
1011
                continue;
1012
            }
1013 42
            try {
1014 35
                $channel->close();
1015
            } catch (\Exception $e) {
1016
                /* Ignore closing errors */
1017 75
            }
1018 150
        }
1019
    }
1020
1021
    /**
1022
     * Should the connection be attempted during construction?
1023
     *
1024
     * @return bool
1025 162
     */
1026
    public function connectOnConstruct()
1027 162
    {
1028
        return true;
1029
    }
1030
1031
    /**
1032
     * @return array
1033
     */
1034
    public function getServerProperties()
1035
    {
1036
        return $this->server_properties;
1037
    }
1038
1039
    /**
1040
     * Get the library properties for populating the client protocol information
1041
     *
1042
     * @return array
1043 192
     */
1044
    public function getLibraryProperties()
1045 192
    {
1046
        return self::$LIBRARY_PROPERTIES;
1047
    }
1048 12
1049 12
    public static function create_connection($hosts, $options = array()){
1050 12
        $latest_exception = null;
1051 12
        foreach ($hosts as $hostdef) {
1052 12
            AbstractConnection::validate_host($hostdef);
1053 12
            $host = $hostdef['host'];
1054 12
            $port = $hostdef['port'];
1055 12
            $user = $hostdef['user'];
1056 12
            $password = $hostdef['password'];
1057
            $vhost = isset($hostdef['vhost']) ? $hostdef['vhost'] : "/";
1058 12
            try {
1059 12
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
1060
                return $conn;
1061
            } catch (\Exception $e) {
1062
                $latest_exception = $e;
1063
            }
1064
        }
1065
        throw $latest_exception;
1066
    }
1067
1068
    public static function validate_host($host) {
1069
        if(!isset($host['host'])){
1070
            throw new \InvalidArgumentException("'host' key is required.");
1071
        }
1072
        if(!isset($host['port'])){
1073
            throw new \InvalidArgumentException("'port' key is required.");
1074
        }
1075
        if(!isset($host['user'])){
1076
            throw new \InvalidArgumentException("'user' key is required.");
1077
        }
1078
        if(!isset($host['password'])){
1079
            throw new \InvalidArgumentException("'password' key is required.");
1080
        }
1081
    }
1082
}
1083