Completed
Push — master ( 9b90e7...a2b649 )
by John
17:01 queued 14:42
created

PhpAmqpLib/Connection/AbstractConnection.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AMQPChannel;
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use PhpAmqpLib\Wire\AMQPWriter;
12
use PhpAmqpLib\Wire\IO\AbstractIO;
13
use PhpAmqpLib\Wire\IO\SocketIO;
14
use PhpAmqpLib\Wire\IO\StreamIO;
15
16
class AbstractConnection extends AbstractChannel
17
{
18
    /** @var array */
19
    public static $LIBRARY_PROPERTIES = array(
20
        'product' => array('S', 'AMQPLib'),
21
        'platform' => array('S', 'PHP'),
22
        'version' => array('S', '2.4'),
23
        'information' => array('S', ''),
24
        'copyright' => array('S', ''),
25
        'capabilities' => array(
26
            'F',
27
            array(
28
                'authentication_failure_close' => array('t', true),
29
                'publisher_confirms' => array('t', true),
30
                'consumer_cancel_notify' => array('t', true),
31
                'exchange_exchange_bindings' => array('t', true),
32
                'basic.nack' => array('t', true),
33
                'connection.blocked' => array('t', true)
34
            )
35
        )
36
    );
37
38
    /** @var AMQPChannel[] */
39
    public $channels = array();
40
41
    /** @var int */
42
    protected $version_major;
43
44
    /** @var int */
45
    protected $version_minor;
46
47
    /** @var array */
48
    protected $server_properties;
49
50
    /** @var array */
51
    protected $mechanisms;
52
53
    /** @var array */
54
    protected $locales;
55
56
    /** @var bool */
57
    protected $wait_tune_ok;
58
59
    /** @var string */
60
    protected $known_hosts;
61
62
    /** @var AMQPReader */
63
    protected $input;
64
65
    /** @var string */
66
    protected $vhost;
67
68
    /** @var bool */
69
    protected $insist;
70
71
    /** @var string */
72
    protected $login_method;
73
74
    /** @var string */
75
    protected $login_response;
76
77
    /** @var string */
78
    protected $locale;
79
80
    /** @var int */
81
    protected $heartbeat;
82
83
    /** @var SocketIO */
84
    protected $sock;
85
86
    /** @var int */
87
    protected $channel_max = 65535;
88
89
    /** @var int */
90
    protected $frame_max = 131072;
91
92
     /** @var array Constructor parameters for clone */
93
    protected $construct_params;
94
95
    /** @var bool Close the connection in destructor */
96
    protected $close_on_destruct = true;
97
98
    /** @var bool Maintain connection status */
99
    protected $is_connected = false;
100
101
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
102
    protected $io;
103
104
    /** @var \PhpAmqpLib\Wire\AMQPReader */
105
    protected $wait_frame_reader;
106
107
    /** @var callable Handles connection blocking from the server */
108
    private $connection_block_handler;
109
110
    /** @var callable Handles connection unblocking from the server */
111
    private $connection_unblock_handler;
112
113
    /**
114
     * Circular buffer to speed up prepare_content().
115
     * Max size limited by $prepare_content_cache_max_size.
116
     *
117
     * @var array
118
     * @see prepare_content()
119
     */
120
    private $prepare_content_cache;
121
122
    /** @var int Maximal size of $prepare_content_cache */
123
    private $prepare_content_cache_max_size;
124
125
    /**
126
     * @param string $user
127
     * @param string $password
128
     * @param string $vhost
129
     * @param bool $insist
130
     * @param string $login_method
131
     * @param null $login_response
132
     * @param string $locale
133
     * @param AbstractIO $io
134
     * @param int $heartbeat
135
     * @throws \Exception
136
     */
137 48
    public function __construct(
138
        $user,
139
        $password,
140
        $vhost = '/',
141
        $insist = false,
142
        $login_method = 'AMQPLAIN',
143
        $login_response = null,
144
        $locale = 'en_US',
145
        AbstractIO $io,
146
        $heartbeat = 0
147
    ) {
148
        // save the params for the use of __clone
149 48
        $this->construct_params = func_get_args();
150
151 48
        $this->wait_frame_reader = new AMQPReader(null);
152 48
        $this->vhost = $vhost;
153 48
        $this->insist = $insist;
154 48
        $this->login_method = $login_method;
155 48
        $this->login_response = $login_response;
156 48
        $this->locale = $locale;
157 48
        $this->io = $io;
158 48
        $this->heartbeat = $heartbeat;
159
160 48
        if ($user && $password) {
161 48
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
162 48
            $this->login_response->write_table(array(
163 48
                'LOGIN' => array('S', $user),
164 48
                'PASSWORD' => array('S', $password)
165 32
            ));
166
167
            // Skip the length
168 48
            $responseValue = $this->login_response->getvalue();
169 48
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
170
171 32
        } else {
172
            $this->login_response = null;
173
        }
174
175 48
        $this->prepare_content_cache = array();
176 48
        $this->prepare_content_cache_max_size = 100;
177
178
        // Lazy Connection waits on connecting
179 48
        if ($this->connectOnConstruct()) {
180 36
            $this->connect();
181 24
        }
182 48
    }
183
184
    /**
185
     * Connects to the AMQP server
186
     */
187 48
    protected function connect()
188
    {
189
        try {
190
            // Loop until we connect
191 48
            while (!$this->isConnected()) {
192
                // Assume we will connect, until we dont
193 48
                $this->setIsConnected(true);
194
195
                // Connect the socket
196 48
                $this->getIO()->connect();
197
198 48
                $this->channels = array();
199
                // The connection object itself is treated as channel 0
200 48
                parent::__construct($this, 0);
201
202 48
                $this->input = new AMQPReader(null, $this->getIO());
203
204 48
                $this->write($this->amqp_protocol_header);
205 48
                $this->wait(array($this->waitHelper->get_wait('connection.start')));
206 48
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
207
208 48
                $this->wait_tune_ok = true;
209 48
                while ($this->wait_tune_ok) {
210 48
                    $this->wait(array(
211 48
                        $this->waitHelper->get_wait('connection.secure'),
212 48
                        $this->waitHelper->get_wait('connection.tune')
213 32
                    ));
214 32
                }
215
216 48
                $host = $this->x_open($this->vhost, '', $this->insist);
217 48
                if (!$host) {
218 48
                    return null; // we weren't redirected
219
                }
220
221
                $this->setIsConnected(false);
222
                $this->closeChannels();
223
224
                // we were redirected, close the socket, loop and try again
225
                $this->close_socket();
226
            }
227
228 8
        } catch (\Exception $e) {
229
            // Something went wrong, set the connection status
230
            $this->setIsConnected(false);
231
            $this->closeChannels();
232
            throw $e; // Rethrow exception
233
        }
234 12
    }
235
236
    /**
237
     * Reconnects using the original connection settings.
238
     * This will not recreate any channels that were established previously
239
     */
240 24
    public function reconnect()
241
    {
242
        // Try to close the AMQP connection
243 24
        $this->safeClose();
244
        // Reconnect the socket/stream then AMQP
245 24
        $this->getIO()->reconnect();
246 24
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
247 24
        $this->connect();
248 24
    }
249
250
    /**
251
     * Cloning will use the old properties to make a new connection to the same server
252
     */
253
    public function __clone()
254
    {
255
        call_user_func_array(array($this, '__construct'), $this->construct_params);
256
    }
257
258
    public function __destruct()
259
    {
260
        if ($this->close_on_destruct) {
261
            $this->safeClose();
262
        }
263
    }
264
265
    /**
266
     * Attempts to close the connection safely
267
     */
268 24
    protected function safeClose()
269
    {
270
        try {
271 24
            if (isset($this->input) && $this->input) {
272 16
                $this->close();
273 8
            }
274 16
        } catch (\Exception $e) {
275
            // Nothing here
276
        }
277 24
    }
278
279
    /**
280
     * @param int $sec
281
     * @param int $usec
282
     * @return mixed
283
     */
284
    public function select($sec, $usec = 0)
285
    {
286
        return $this->getIO()->select($sec, $usec);
287
    }
288
289
    /**
290
     * Allows to not close the connection
291
     * it's useful after the fork when you don't want to close parent process connection
292
     *
293
     * @param bool $close
294
     */
295
    public function set_close_on_destruct($close = true)
296
    {
297
        $this->close_on_destruct = (bool) $close;
298
    }
299
300 48
    protected function close_input()
301
    {
302 48
        $this->debug->debug_msg('closing input');
303
304 48
        if (!is_null($this->input)) {
305 48
            $this->input->close();
306 48
            $this->input = null;
307 32
        }
308 48
    }
309
310 48
    protected function close_socket()
311
    {
312 48
        $this->debug->debug_msg('closing socket');
313
314 48
        if (!is_null($this->getIO())) {
315 48
            $this->getIO()->close();
316 32
        }
317 48
    }
318
319
    /**
320
     * @param $data
321
     */
322 48
    public function write($data)
323
    {
324 48
        $this->debug->debug_hexdump($data);
325
326
        try {
327 48
            $this->getIO()->write($data);
328 32
        } catch (AMQPRuntimeException $e) {
329
            $this->setIsConnected(false);
330
            throw $e;
331
        }
332 48
    }
333
334 48
    protected function do_close()
335
    {
336 48
        $this->setIsConnected(false);
337 48
        $this->close_input();
338 48
        $this->close_socket();
339 48
    }
340
341
    /**
342
     * @return int
343
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
344
     */
345 48
    public function get_free_channel_id()
346
    {
347 48
        for ($i = 1; $i <= $this->channel_max; $i++) {
348 48
            if (!isset($this->channels[$i])) {
349 48
                return $i;
350
            }
351 12
        }
352
353
        throw new AMQPRuntimeException('No free channel ids');
354
    }
355
356
    /**
357
     * @param string $channel
358
     * @param int $class_id
359
     * @param int $weight
360
     * @param int $body_size
361
     * @param string $packed_properties
362
     * @param string $body
363
     * @param AMQPWriter $pkt
364
     */
365 42
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
366
    {
367 42
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
368 42
        $this->write($pkt->getvalue());
369 42
    }
370
371
    /**
372
     * Returns a new AMQPWriter or mutates the provided $pkt
373
     *
374
     * @param string $channel
375
     * @param int $class_id
376
     * @param int $weight
377
     * @param int $body_size
378
     * @param string $packed_properties
379
     * @param string $body
380
     * @param AMQPWriter $pkt
381
     * @return AMQPWriter
382
     */
383 42
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
384
    {
385 42
        $pkt = $pkt ?: new AMQPWriter();
386
387
        // Content already prepared ?
388 42
        $key_cache = sprintf(
389 42
            '%s|%s|%s|%s',
390 28
            $channel,
391 28
            $packed_properties,
392 28
            $class_id,
393
            $weight
394 28
        );
395
396 42
        if (!isset($this->prepare_content_cache[$key_cache])) {
397 42
            $w = new AMQPWriter();
398 42
            $w->write_octet(2);
399 42
            $w->write_short($channel);
400 42
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
401 42
            $w->write_short($class_id);
402 42
            $w->write_short($weight);
403 42
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
404 42
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
405
                reset($this->prepare_content_cache);
406
                $old_key = key($this->prepare_content_cache);
407
                unset($this->prepare_content_cache[$old_key]);
408
            }
409 28
        }
410 42
        $pkt->write($this->prepare_content_cache[$key_cache]);
411
412 42
        $pkt->write_longlong($body_size);
413 42
        $pkt->write($packed_properties);
414
415 42
        $pkt->write_octet(0xCE);
416
417
418
        // memory efficiency: walk the string instead of biting
419
        // it. good for very large packets (close in size to
420
        // memory_limit setting)
421 42
        $position = 0;
422 42
        $bodyLength = mb_strlen($body,'ASCII');
423 42
        while ($position < $bodyLength) {
424 36
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
425 36
            $position += $this->frame_max - 8;
426
427 36
            $pkt->write_octet(3);
428 36
            $pkt->write_short($channel);
429 36
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
430
431 36
            $pkt->write($payload);
432
433 36
            $pkt->write_octet(0xCE);
434 24
        }
435
436 42
        return $pkt;
437
    }
438
439
    /**
440
     * @param string $channel
441
     * @param $method_sig
442
     * @param string $args
443
     * @param null $pkt
444
     */
445 48
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
446
    {
447 48
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
448 48
        $this->write($pkt->getvalue());
449 48
        $this->debug->debug_method_signature1($method_sig);
450 48
    }
451
452
    /**
453
     * Returns a new AMQPWriter or mutates the provided $pkt
454
     *
455
     * @param string $channel
456
     * @param string[] $method_sig
457
     * @param string $args
458
     * @param AMQPWriter $pkt
459
     * @return AMQPWriter
460
     */
461 48
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
462
    {
463 48
        if ($args instanceof AMQPWriter) {
464 48
            $args = $args->getvalue();
465 32
        }
466
467 48
        $pkt = $pkt ?: new AMQPWriter();
468
469 48
        $pkt->write_octet(1);
470 48
        $pkt->write_short($channel);
471 48
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
472
        // in payload
473
474 48
        $pkt->write_short($method_sig[0]); // class_id
475 48
        $pkt->write_short($method_sig[1]); // method_id
476 48
        $pkt->write($args);
477
478 48
        $pkt->write_octet(0xCE);
479
480 48
        $this->debug->debug_method_signature1($method_sig);
481
482 48
        return $pkt;
483
    }
484
485
    /**
486
     * Waits for a frame from the server
487
     *
488
     * @param int $timeout
489
     * @return array
490
     * @throws \Exception
491
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
492
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
493
     */
494 48
    protected function wait_frame($timeout = 0)
495
    {
496 48
        if (is_null($this->input))
497 32
        {
498
            $this->setIsConnected(false);
499
            throw new AMQPRuntimeException('Broken pipe or closed connection');
500
        }
501
502 48
        $currentTimeout = $this->input->getTimeout();
503 48
        $this->input->setTimeout($timeout);
504
505
        try {
506
            // frame_type + channel_id + size
507 48
            $this->wait_frame_reader->reuse(
508 48
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
509 32
            );
510
511 48
            $frame_type = $this->wait_frame_reader->read_octet();
512 48
            $channel = $this->wait_frame_reader->read_short();
513 48
            $size = $this->wait_frame_reader->read_long();
514
515
            // payload + ch
516 48
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
517
518 48
            $payload = $this->wait_frame_reader->read($size);
519 48
            $ch = $this->wait_frame_reader->read_octet();
520
521 32
        } catch (AMQPTimeoutException $e) {
522
            $this->input->setTimeout($currentTimeout);
523
            throw $e;
524
        }
525
526 48
        $this->input->setTimeout($currentTimeout);
527
528 48
        if ($ch != 0xCE) {
529
            throw new AMQPRuntimeException(sprintf(
530
                'Framing error, unexpected byte: %x',
531
                $ch
532
            ));
533
        }
534
535 48
        return array($frame_type, $channel, $payload);
536
    }
537
538
    /**
539
     * Waits for a frame from the server destined for a particular channel.
540
     *
541
     * @param string $channel_id
542
     * @param int $timeout
543
     * @return array
544
     */
545 48
    protected function wait_channel($channel_id, $timeout = 0)
546
    {
547
        // Keeping the original timeout unchanged.
548 48
        $_timeout = $timeout;
549 48
        while (true) {
550 48
            $now = time();
551 48
            list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
552
553 48
            if ($frame_channel === 0 && $frame_type === 8) {
554
                // skip heartbeat frames and reduce the timeout by the time passed
555
                if($_timeout > 0) {
556
                    $_timeout -= time() - $now;
557
                    if($_timeout <= 0) {
558
                        // If timeout has been reached, throw the exception without calling wait_frame
559
                        throw new AMQPTimeoutException("Timeout waiting on channel");
560
                    }
561
                }
562
                continue;
563
564
            } else {
565
566 48
                if ($frame_channel == $channel_id) {
567 48
                    return array($frame_type, $payload);
568
                }
569
570
                // Not the channel we were looking for.  Queue this frame
571
                //for later, when the other channel is looking for frames.
572
                // Make sure the channel still exists, it could have been
573
                // closed by a previous Exception.
574 6
                if (isset($this->channels[$frame_channel])) {
575 6
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
576 4
                }
577
578
                // If we just queued up a method for channel 0 (the Connection
579
                // itself) it's probably a close method in reaction to some
580
                // error, so deal with it right away.
581 6
                if (($frame_type == 1) && ($frame_channel == 0)) {
582
                    $this->wait();
583
                }
584
            }
585 4
        }
586
    }
587
588
    /**
589
     * Fetches a channel object identified by the numeric channel_id, or
590
     * create that object if it doesn't already exist.
591
     *
592
     * @param string $channel_id
593
     * @return AMQPChannel
594
     */
595 48
    public function channel($channel_id = null)
596
    {
597 48
        if (isset($this->channels[$channel_id])) {
598
            return $this->channels[$channel_id];
599
        }
600
601 48
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
602 48
        $ch = new AMQPChannel($this->connection, $channel_id);
603 48
        $this->channels[$channel_id] = $ch;
604
605 48
        return $ch;
606
    }
607
608
    /**
609
     * Requests a connection close
610
     *
611
     * @param int $reply_code
612
     * @param string $reply_text
613
     * @param array $method_sig
614
     * @return mixed|null
615
     */
616 48
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
617
    {
618 48
        if ($this->io instanceof StreamIO)
619 32
        {
620 36
            $this->io->disableHeartbeat();
621 24
        }
622
623 48
        if (!$this->protocolWriter || !$this->isConnected()) {
624
            return null;
625
        }
626
627 48
        $this->closeChannels();
628
629 48
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
630 32
            $reply_code,
631 32
            $reply_text,
632 48
            $method_sig[0],
633 48
            $method_sig[1]
634 32
        );
635 48
        $this->send_method_frame(array($class_id, $method_id), $args);
636
637 48
        $this->setIsConnected(false);
638
639 48
        return $this->wait(array(
640 48
            $this->waitHelper->get_wait('connection.close_ok')
641 32
        ));
642
    }
643
644
    /**
645
     * @param AMQPReader $reader
646
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
647
     */
648
    protected function connection_close(AMQPReader $reader)
649
    {
650
        $reply_code = $reader->read_short();
651
        $reply_text = $reader->read_shortstr();
652
        $class_id = $reader->read_short();
653
        $method_id = $reader->read_short();
654
655
        $this->x_close_ok();
656
657
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
658
    }
659
660
    /**
661
     * Confirms a connection close
662
     */
663
    protected function x_close_ok()
664
    {
665
        $this->send_method_frame(
666
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
667
        );
668
        $this->do_close();
669
    }
670
671
    /**
672
     * Confirm a connection close
673
     */
674 48
    protected function connection_close_ok($args)
675
    {
676 48
        $this->do_close();
677 48
    }
678
679
    /**
680
     * @param string $virtual_host
681
     * @param string $capabilities
682
     * @param bool $insist
683
     * @return mixed
684
     */
685 48
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
686
    {
687 48
        $args = new AMQPWriter();
688 48
        $args->write_shortstr($virtual_host);
689 48
        $args->write_shortstr($capabilities);
690 48
        $args->write_bits(array($insist));
691 48
        $this->send_method_frame(array(10, 40), $args);
692
693
        $wait = array(
694 48
            $this->waitHelper->get_wait('connection.open_ok')
695 32
        );
696
697 48
        if ($this->protocolVersion == '0.8') {
698
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
699
        }
700
701 48
        return $this->wait($wait);
702
    }
703
704
    /**
705
     * Signals that the connection is ready
706
     *
707
     * @param AMQPReader $args
708
     */
709 48
    protected function connection_open_ok($args)
710
    {
711 48
        $this->known_hosts = $args->read_shortstr();
712 48
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
713 48
    }
714
715
    /**
716
     * Asks the client to use a different server
717
     *
718
     * @param AMQPReader $args
719
     * @return string
720
     */
721
    protected function connection_redirect($args)
722
    {
723
        $host = $args->read_shortstr();
724
        $this->known_hosts = $args->read_shortstr();
725
        $this->debug->debug_msg(sprintf(
726
                'Redirected to [%s], known_hosts [%s]',
727
                $host,
728
                $this->known_hosts
729
            ));
730
731
        return $host;
732
    }
733
734
    /**
735
     * Security mechanism challenge
736
     *
737
     * @param AMQPReader $args
738
     */
739
    protected function connection_secure($args)
740
    {
741
        $challenge = $args->read_longstr();
742
    }
743
744
    /**
745
     * Security mechanism response
746
     */
747
    protected function x_secure_ok($response)
748
    {
749
        $args = new AMQPWriter();
750
        $args->write_longstr($response);
751
        $this->send_method_frame(array(10, 21), $args);
752
    }
753
754
    /**
755
     * Starts connection negotiation
756
     *
757
     * @param AMQPReader $args
758
     */
759 48
    protected function connection_start($args)
760
    {
761 48
        $this->version_major = $args->read_octet();
762 48
        $this->version_minor = $args->read_octet();
763 48
        $this->server_properties = $args->read_table();
764 48
        $this->mechanisms = explode(' ', $args->read_longstr());
765 48
        $this->locales = explode(' ', $args->read_longstr());
766
767 48
        $this->debug->debug_connection_start(
768 48
            $this->version_major,
769 48
            $this->version_minor,
770 48
            $this->server_properties,
771 48
            $this->mechanisms,
772 48
            $this->locales
773 32
        );
774 48
    }
775
776
    /**
777
     * @param AMQPTable|array $clientProperties
778
     * @param string $mechanism
779
     * @param string $response
780
     * @param string $locale
781
     */
782 48
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
783
    {
784 48
        $args = new AMQPWriter();
785 48
        $args->write_table($clientProperties);
786 48
        $args->write_shortstr($mechanism);
787 48
        $args->write_longstr($response);
788 48
        $args->write_shortstr($locale);
789 48
        $this->send_method_frame(array(10, 11), $args);
790 48
    }
791
792
    /**
793
     * Proposes connection tuning parameters
794
     *
795
     * @param AMQPReader $args
796
     */
797 48
    protected function connection_tune($args)
798
    {
799 48
        $v = $args->read_short();
800 48
        if ($v) {
801
            $this->channel_max = $v;
802
        }
803
804 48
        $v = $args->read_long();
805 48
        if ($v) {
806 48
            $this->frame_max = $v;
807 32
        }
808
809
        // use server proposed value if not set
810 48
        if ($this->heartbeat === null) {
811
            $this->heartbeat = $args->read_short();
812
        }
813
814 48
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
815 48
    }
816
817
    /**
818
     * Negotiates connection tuning parameters
819
     *
820
     * @param $channel_max
821
     * @param $frame_max
822
     * @param $heartbeat
823
     */
824 48
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
825
    {
826 48
        $args = new AMQPWriter();
827 48
        $args->write_short($channel_max);
828 48
        $args->write_long($frame_max);
829 48
        $args->write_short($heartbeat);
830 48
        $this->send_method_frame(array(10, 31), $args);
831 48
        $this->wait_tune_ok = false;
832 48
    }
833
834
    /**
835
     * @return SocketIO
836
     */
837
    public function getSocket()
838
    {
839
        return $this->io->getSocket();
840
    }
841
842
    /**
843
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
844
     */
845 36
    protected function getIO()
846
    {
847 36
        return $this->io;
848
    }
849
850
    /**
851
     * Handles connection blocked notifications
852
     *
853
     * @param AMQPReader $args
854
     */
855
    protected function connection_blocked(AMQPReader $args)
856
    {
857
        // Call the block handler and pass in the reason
858
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
859
    }
860
861
    /**
862
     * Handles connection unblocked notifications
863
     */
864
    protected function connection_unblocked(AMQPReader $args)
865
    {
866
        // No args to an unblock event
867
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
868
    }
869
870
    /**
871
     * Sets a handler which is called whenever a connection.block is sent from the server
872
     *
873
     * @param callable $callback
874
     */
875
    public function set_connection_block_handler($callback)
876
    {
877
        $this->connection_block_handler = $callback;
878
    }
879
880
    /**
881
     * Sets a handler which is called whenever a connection.block is sent from the server
882
     *
883
     * @param callable $callback
884
     */
885
    public function set_connection_unblock_handler($callback)
886
    {
887
        $this->connection_unblock_handler = $callback;
888
    }
889
890
    /**
891
     * Gets the connection status
892
     *
893
     * @return bool
894
     */
895 48
    public function isConnected()
896
    {
897 48
        return (bool) $this->is_connected;
898
    }
899
900
    /**
901
     * Set the connection status
902
     *
903
     * @param bool $is_connected
904
     */
905 48
    protected function setIsConnected($is_connected)
906
    {
907 48
        $this->is_connected = (bool) $is_connected;
908 48
    }
909
910
    /**
911
     * Closes all available channels
912
     */
913 48
    protected function closeChannels()
914
    {
915 48
        foreach ($this->channels as $key => $channel) {
916
            // channels[0] is this connection object, so don't close it yet
917 48
            if ($key === 0) {
918 48
                continue;
919
            }
920
            try {
921 30
                $channel->close();
922 30
            } catch (\Exception $e) {
923
                /* Ignore closing errors */
924
            }
925 32
        }
926 48
    }
927
928
    /**
929
     * Should the connection be attempted during construction?
930
     *
931
     * @return bool
932
     */
933 36
    public function connectOnConstruct()
934
    {
935 36
        return true;
936
    }
937
}
938