Completed
Push — master ( a59bb7...a92e1b )
by
unknown
22:43
created

PhpAmqpLib/Connection/AbstractConnection.php (1 issue)

Labels
Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AMQPChannel;
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use PhpAmqpLib\Wire\AMQPWriter;
12
use PhpAmqpLib\Wire\IO\AbstractIO;
13
use PhpAmqpLib\Wire\IO\SocketIO;
14
use PhpAmqpLib\Wire\IO\StreamIO;
15
16
class AbstractConnection extends AbstractChannel
17
{
18
    /** @var array */
19
    public static $LIBRARY_PROPERTIES = array(
20
        'product' => array('S', 'AMQPLib'),
21
        'platform' => array('S', 'PHP'),
22
        'version' => array('S', '2.6'),
23
        'information' => array('S', ''),
24
        'copyright' => array('S', ''),
25
        'capabilities' => array(
26
            'F',
27
            array(
28
                'authentication_failure_close' => array('t', true),
29
                'publisher_confirms' => array('t', true),
30
                'consumer_cancel_notify' => array('t', true),
31
                'exchange_exchange_bindings' => array('t', true),
32
                'basic.nack' => array('t', true),
33
                'connection.blocked' => array('t', true)
34
            )
35
        )
36
    );
37
38
    /** @var AMQPChannel[] */
39
    public $channels = array();
40
41
    /** @var int */
42
    protected $version_major;
43
44
    /** @var int */
45
    protected $version_minor;
46
47
    /** @var array */
48
    protected $server_properties;
49
50
    /** @var array */
51
    protected $mechanisms;
52
53
    /** @var array */
54
    protected $locales;
55
56
    /** @var bool */
57
    protected $wait_tune_ok;
58
59
    /** @var string */
60
    protected $known_hosts;
61
62
    /** @var AMQPReader */
63
    protected $input;
64
65
    /** @var string */
66
    protected $vhost;
67
68
    /** @var bool */
69
    protected $insist;
70
71
    /** @var string */
72
    protected $login_method;
73
74
    /** @var string */
75
    protected $login_response;
76
77
    /** @var string */
78
    protected $locale;
79
80
    /** @var int */
81
    protected $heartbeat;
82
83
    /** @var SocketIO */
84
    protected $sock;
85
86
    /** @var int */
87
    protected $channel_max = 65535;
88
89
    /** @var int */
90
    protected $frame_max = 131072;
91
92
     /** @var array Constructor parameters for clone */
93
    protected $construct_params;
94
95
    /** @var bool Close the connection in destructor */
96
    protected $close_on_destruct = true;
97
98
    /** @var bool Maintain connection status */
99
    protected $is_connected = false;
100
101
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
102
    protected $io;
103
104
    /** @var \PhpAmqpLib\Wire\AMQPReader */
105
    protected $wait_frame_reader;
106
107
    /** @var callable Handles connection blocking from the server */
108
    private $connection_block_handler;
109
110
    /** @var callable Handles connection unblocking from the server */
111
    private $connection_unblock_handler;
112
113
    /**
114
     * Circular buffer to speed up prepare_content().
115
     * Max size limited by $prepare_content_cache_max_size.
116
     *
117
     * @var array
118
     * @see prepare_content()
119
     */
120
    private $prepare_content_cache;
121
122
    /** @var int Maximal size of $prepare_content_cache */
123
    private $prepare_content_cache_max_size;
124
125
    /**
126
     * @param string $user
127
     * @param string $password
128
     * @param string $vhost
129
     * @param bool $insist
130
     * @param string $login_method
131
     * @param null $login_response
132
     * @param string $locale
133
     * @param AbstractIO $io
134
     * @param int $heartbeat
135
     * @throws \Exception
136
     */
137 84
    public function __construct(
138
        $user,
139
        $password,
140
        $vhost = '/',
141
        $insist = false,
142
        $login_method = 'AMQPLAIN',
143
        $login_response = null,
144
        $locale = 'en_US',
145
        AbstractIO $io,
146
        $heartbeat = 0
147
    ) {
148
        // save the params for the use of __clone
149 84
        $this->construct_params = func_get_args();
150
151 84
        $this->wait_frame_reader = new AMQPReader(null);
152 84
        $this->vhost = $vhost;
153 84
        $this->insist = $insist;
154 84
        $this->login_method = $login_method;
155 84
        $this->login_response = $login_response;
156 84
        $this->locale = $locale;
157 84
        $this->io = $io;
158 84
        $this->heartbeat = $heartbeat;
159
160 84
        if ($user && $password) {
161 84
            $this->login_response = new AMQPWriter();
162 84
            $this->login_response->write_table(array(
163 84
                'LOGIN' => array('S', $user),
164 84
                'PASSWORD' => array('S', $password)
165 70
            ));
166
167
            // Skip the length
168 84
            $responseValue = $this->login_response->getvalue();
169 84
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
170
171 70
        } else {
172
            $this->login_response = null;
173
        }
174
175 84
        $this->prepare_content_cache = array();
176 84
        $this->prepare_content_cache_max_size = 100;
177
178
        // Lazy Connection waits on connecting
179 84
        if ($this->connectOnConstruct()) {
180 60
            $this->connect();
181 45
        }
182 78
    }
183
184
    /**
185
     * Connects to the AMQP server
186
     */
187 84
    protected function connect()
188
    {
189
        try {
190
            // Loop until we connect
191 84
            while (!$this->isConnected()) {
192
                // Assume we will connect, until we dont
193 84
                $this->setIsConnected(true);
194
195
                // Connect the socket
196 84
                $this->getIO()->connect();
197
198 78
                $this->channels = array();
199
                // The connection object itself is treated as channel 0
200 78
                parent::__construct($this, 0);
201
202 78
                $this->input = new AMQPReader(null, $this->getIO());
203
204 78
                $this->write($this->amqp_protocol_header);
205 78
                $this->wait(array($this->waitHelper->get_wait('connection.start')));
206 78
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
207
208 78
                $this->wait_tune_ok = true;
209 78
                while ($this->wait_tune_ok) {
210 78
                    $this->wait(array(
211 78
                        $this->waitHelper->get_wait('connection.secure'),
212 78
                        $this->waitHelper->get_wait('connection.tune')
213 65
                    ));
214 65
                }
215
216 78
                $host = $this->x_open($this->vhost, '', $this->insist);
217 78
                if (!$host) {
218 78
                    //Reconnected
219
                    if ($this->io instanceof StreamIO)
220
                    {
221
                        $this->getIO()->reenableHeartbeat();
0 ignored issues
show
It seems like you code against a specific sub-type and not the parent class PhpAmqpLib\Wire\IO\AbstractIO as the method reenableHeartbeat() does only exist in the following sub-classes of PhpAmqpLib\Wire\IO\AbstractIO: PhpAmqpLib\Wire\IO\StreamIO. Maybe you want to instanceof check for one of these explicitly?

Let’s take a look at an example:

abstract class User
{
    /** @return string */
    abstract public function getPassword();
}

class MyUser extends User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different sub-classes of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the parent class:

    abstract class User
    {
        /** @return string */
        abstract public function getPassword();
    
        /** @return string */
        abstract public function getDisplayName();
    }
    
Loading history...
222
                    }
223
                    return null; // we weren't redirected
224
                }
225
226
                $this->setIsConnected(false);
227
                $this->closeChannels();
228 26
229
                // we were redirected, close the socket, loop and try again
230 6
                $this->close_socket();
231 6
            }
232 6
233
        } catch (\Exception $e) {
234 24
            // Something went wrong, set the connection status
235
            $this->setIsConnected(false);
236
            $this->closeChannels();
237
            throw $e; // Rethrow exception
238
        }
239
    }
240 36
241
    /**
242
     * Reconnects using the original connection settings.
243 36
     * This will not recreate any channels that were established previously
244
     */
245 36
    public function reconnect()
246 36
    {
247 36
        // Try to close the AMQP connection
248 36
        $this->safeClose();
249
        // Reconnect the socket/stream then AMQP
250
        $this->getIO()->reconnect();
251
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
252
        $this->connect();
253
    }
254
255
    /**
256
     * Cloning will use the old properties to make a new connection to the same server
257
     */
258
    public function __clone()
259
    {
260
        call_user_func_array(array($this, '__construct'), $this->construct_params);
261
    }
262
263
    public function __destruct()
264
    {
265
        if ($this->close_on_destruct) {
266
            $this->safeClose();
267
        }
268 36
    }
269
270
    /**
271 36
     * Attempts to close the connection safely
272 21
     */
273 15
    protected function safeClose()
274 30
    {
275
        try {
276
            if (isset($this->input) && $this->input) {
277 36
                $this->close();
278
            }
279
        } catch (\Exception $e) {
280
            // Nothing here
281
        }
282
    }
283
284
    /**
285
     * @param int $sec
286
     * @param int $usec
287
     * @return mixed
288
     */
289
    public function select($sec, $usec = 0)
290
    {
291
        return $this->getIO()->select($sec, $usec);
292
    }
293
294
    /**
295
     * Allows to not close the connection
296
     * it's useful after the fork when you don't want to close parent process connection
297
     *
298
     * @param bool $close
299
     */
300 78
    public function set_close_on_destruct($close = true)
301
    {
302 78
        $this->close_on_destruct = (bool) $close;
303
    }
304 78
305 78
    protected function close_input()
306 78
    {
307 65
        $this->debug->debug_msg('closing input');
308 78
309
        if (!is_null($this->input)) {
310 78
            $this->input->close();
311
            $this->input = null;
312 78
        }
313
    }
314 78
315 78
    protected function close_socket()
316 65
    {
317 78
        $this->debug->debug_msg('closing socket');
318
319
        if (!is_null($this->getIO())) {
320
            $this->getIO()->close();
321
        }
322 78
    }
323
324 78
    /**
325
     * @param $data
326
     */
327 78
    public function write($data)
328 65
    {
329
        $this->debug->debug_hexdump($data);
330
331
        try {
332 78
            $this->getIO()->write($data);
333
        } catch (AMQPRuntimeException $e) {
334 78
            $this->setIsConnected(false);
335
            throw $e;
336 78
        }
337 78
    }
338 78
339 78
    protected function do_close()
340
    {
341
        $this->setIsConnected(false);
342
        $this->close_input();
343
        $this->close_socket();
344
    }
345 78
346
    /**
347 78
     * @return int
348 78
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
349 78
     */
350
    public function get_free_channel_id()
351 15
    {
352
        for ($i = 1; $i <= $this->channel_max; $i++) {
353
            if (!isset($this->channels[$i])) {
354
                return $i;
355
            }
356
        }
357
358
        throw new AMQPRuntimeException('No free channel ids');
359
    }
360
361
    /**
362
     * @param string $channel
363
     * @param int $class_id
364
     * @param int $weight
365 66
     * @param int $body_size
366
     * @param string $packed_properties
367 66
     * @param string $body
368 66
     * @param AMQPWriter $pkt
369 66
     */
370
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
371
    {
372
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
373
        $this->write($pkt->getvalue());
374
    }
375
376
    /**
377
     * Returns a new AMQPWriter or mutates the provided $pkt
378
     *
379
     * @param string $channel
380
     * @param int $class_id
381
     * @param int $weight
382
     * @param int $body_size
383 66
     * @param string $packed_properties
384
     * @param string $body
385 66
     * @param AMQPWriter $pkt
386
     * @return AMQPWriter
387
     */
388 66
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
389 66
    {
390 55
        $pkt = $pkt ?: new AMQPWriter();
391 55
392 55
        // Content already prepared ?
393
        $key_cache = sprintf(
394 55
            '%s|%s|%s|%s',
395
            $channel,
396 66
            $packed_properties,
397 66
            $class_id,
398 66
            $weight
399 66
        );
400 66
401 66
        if (!isset($this->prepare_content_cache[$key_cache])) {
402 66
            $w = new AMQPWriter();
403 66
            $w->write_octet(2);
404 66
            $w->write_short($channel);
405
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
406
            $w->write_short($class_id);
407
            $w->write_short($weight);
408
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
409 55
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
410 66
                reset($this->prepare_content_cache);
411
                $old_key = key($this->prepare_content_cache);
412 66
                unset($this->prepare_content_cache[$old_key]);
413 66
            }
414
        }
415 66
        $pkt->write($this->prepare_content_cache[$key_cache]);
416
417
        $pkt->write_longlong($body_size);
418
        $pkt->write($packed_properties);
419
420
        $pkt->write_octet(0xCE);
421 66
422 66
423 66
        // memory efficiency: walk the string instead of biting
424 60
        // it. good for very large packets (close in size to
425 60
        // memory_limit setting)
426
        $position = 0;
427 60
        $bodyLength = mb_strlen($body,'ASCII');
428 60
        while ($position < $bodyLength) {
429 60
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
430
            $position += $this->frame_max - 8;
431 60
432
            $pkt->write_octet(3);
433 60
            $pkt->write_short($channel);
434 50
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
435
436 66
            $pkt->write($payload);
437
438
            $pkt->write_octet(0xCE);
439
        }
440
441
        return $pkt;
442
    }
443
444
    /**
445 78
     * @param string $channel
446
     * @param array $method_sig
447 78
     * @param AMQPWriter|string $args
448 78
     * @param null $pkt
449 78
     */
450 78
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
451
    {
452
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
453
        $this->write($pkt->getvalue());
454
        $this->debug->debug_method_signature1($method_sig);
455
    }
456
457
    /**
458
     * Returns a new AMQPWriter or mutates the provided $pkt
459
     *
460
     * @param string $channel
461 78
     * @param array $method_sig
462
     * @param AMQPWriter|string $args
463 78
     * @param AMQPWriter $pkt
464 78
     * @return AMQPWriter
465 65
     */
466
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
467 78
    {
468
        if ($args instanceof AMQPWriter) {
469 78
            $args = $args->getvalue();
470 78
        }
471 78
472
        $pkt = $pkt ?: new AMQPWriter();
473
474 78
        $pkt->write_octet(1);
475 78
        $pkt->write_short($channel);
476 78
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
477
        // in payload
478 78
479
        $pkt->write_short($method_sig[0]); // class_id
480 78
        $pkt->write_short($method_sig[1]); // method_id
481
        $pkt->write($args);
482 78
483
        $pkt->write_octet(0xCE);
484
485
        $this->debug->debug_method_signature1($method_sig);
486
487
        return $pkt;
488
    }
489
490
    /**
491
     * Waits for a frame from the server
492
     *
493
     * @param int $timeout
494 78
     * @return array
495
     * @throws \Exception
496 78
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
497 65
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
498
     */
499
    protected function wait_frame($timeout = 0)
500
    {
501
        if (is_null($this->input))
502 78
        {
503 78
            $this->setIsConnected(false);
504
            throw new AMQPRuntimeException('Broken pipe or closed connection');
505
        }
506
507 78
        $currentTimeout = $this->input->getTimeout();
508 78
        $this->input->setTimeout($timeout);
509 65
510
        try {
511 78
            // frame_type + channel_id + size
512 78
            $this->wait_frame_reader->reuse(
513 78
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
514
            );
515
516 78
            $frame_type = $this->wait_frame_reader->read_octet();
517 78
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
518
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
519
                throw new AMQPRuntimeException('Invalid frame type ' . $frame_type);
520 78
            }
521
            $channel = $this->wait_frame_reader->read_short();
522 78
            $size = $this->wait_frame_reader->read_long();
523 78
524
            // payload + ch
525 65
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
526
527
            $payload = $this->wait_frame_reader->read($size);
528
            $ch = $this->wait_frame_reader->read_octet();
529
530 78
        } catch (AMQPTimeoutException $e) {
531
            $this->input->setTimeout($currentTimeout);
532 78
            throw $e;
533
        }
534
535
        $this->input->setTimeout($currentTimeout);
536
537
        if ($ch != 0xCE) {
538
            throw new AMQPRuntimeException(sprintf(
539 78
                'Framing error, unexpected byte: %x',
540
                $ch
541
            ));
542
        }
543
544
        return array($frame_type, $channel, $payload);
545
    }
546
547
    /**
548
     * Waits for a frame from the server destined for a particular channel.
549 78
     *
550
     * @param string $channel_id
551
     * @param int $timeout
552 78
     * @return array
553 78
     */
554 78
    protected function wait_channel($channel_id, $timeout = 0)
555 78
    {
556
        // Keeping the original timeout unchanged.
557 78
        $_timeout = $timeout;
558
        while (true) {
559
            $now = time();
560
            list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
561
562
            if ($frame_channel === 0 && $frame_type === 8) {
563
                // skip heartbeat frames and reduce the timeout by the time passed
564
                if($_timeout > 0) {
565
                    $_timeout -= time() - $now;
566
                    if($_timeout <= 0) {
567
                        // If timeout has been reached, throw the exception without calling wait_frame
568
                        throw new AMQPTimeoutException("Timeout waiting on channel");
569
                    }
570 78
                }
571 78
                continue;
572
573
            } else {
574
575
                if ($frame_channel == $channel_id) {
576
                    return array($frame_type, $payload);
577
                }
578 6
579 6
                // Not the channel we were looking for.  Queue this frame
580 5
                //for later, when the other channel is looking for frames.
581
                // Make sure the channel still exists, it could have been
582
                // closed by a previous Exception.
583
                if (isset($this->channels[$frame_channel])) {
584
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
585 6
                }
586
587
                // If we just queued up a method for channel 0 (the Connection
588
                // itself) it's probably a close method in reaction to some
589 5
                // error, so deal with it right away.
590
                if (($frame_type == 1) && ($frame_channel == 0)) {
591
                    $this->wait();
592
                }
593
            }
594
        }
595
    }
596
597
    /**
598
     * Fetches a channel object identified by the numeric channel_id, or
599 78
     * create that object if it doesn't already exist.
600
     *
601 78
     * @param string $channel_id
602
     * @return AMQPChannel
603
     */
604
    public function channel($channel_id = null)
605 78
    {
606 78
        if (isset($this->channels[$channel_id])) {
607 78
            return $this->channels[$channel_id];
608
        }
609 78
610
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
611
        $ch = new AMQPChannel($this->connection, $channel_id);
612
        $this->channels[$channel_id] = $ch;
613
614
        return $ch;
615
    }
616
617
    /**
618
     * Requests a connection close
619
     *
620 78
     * @param int $reply_code
621
     * @param string $reply_text
622 78
     * @param array $method_sig
623 65
     * @return mixed|null
624 36
     */
625 30
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
626
    {
627 78
        if ($this->io instanceof StreamIO)
628 6
        {
629
            $this->io->disableHeartbeat();
630
        }
631 78
632
        if (empty($this->protocolWriter) || !$this->isConnected()) {
633 78
            return null;
634 65
        }
635 65
636 78
        $this->closeChannels();
637 78
638 65
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
639 78
            $reply_code,
640
            $reply_text,
641 78
            $method_sig[0],
642
            $method_sig[1]
643 78
        );
644 78
        $this->send_method_frame(array($class_id, $method_id), $args);
645 65
646
        $this->setIsConnected(false);
647
648
        return $this->wait(array(
649
            $this->waitHelper->get_wait('connection.close_ok')
650
        ));
651
    }
652
653
    /**
654
     * @param AMQPReader $reader
655
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
656
     */
657
    protected function connection_close(AMQPReader $reader)
658
    {
659
        $reply_code = $reader->read_short();
660
        $reply_text = $reader->read_shortstr();
661
        $class_id = $reader->read_short();
662
        $method_id = $reader->read_short();
663
664
        $this->x_close_ok();
665
666
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
667
    }
668
669
    /**
670
     * Confirms a connection close
671
     */
672
    protected function x_close_ok()
673
    {
674
        $this->send_method_frame(
675
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
676
        );
677
        $this->do_close();
678
    }
679
680 78
    /**
681
     * Confirm a connection close
682 78
     *
683 78
     * @param AMQPReader $args
684
     */
685
    protected function connection_close_ok($args)
686
    {
687
        $this->do_close();
688
    }
689
690
    /**
691 78
     * @param string $virtual_host
692
     * @param string $capabilities
693 78
     * @param bool $insist
694 78
     * @return mixed
695 78
     */
696 78
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
697 78
    {
698
        $args = new AMQPWriter();
699
        $args->write_shortstr($virtual_host);
700 78
        $args->write_shortstr($capabilities);
701 65
        $args->write_bits(array($insist));
702
        $this->send_method_frame(array(10, 40), $args);
703 78
704
        $wait = array(
705
            $this->waitHelper->get_wait('connection.open_ok')
706
        );
707 78
708
        if ($this->protocolVersion == '0.8') {
709
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
710
        }
711
712
        return $this->wait($wait);
713
    }
714
715 78
    /**
716
     * Signals that the connection is ready
717 78
     *
718 78
     * @param AMQPReader $args
719 78
     */
720
    protected function connection_open_ok($args)
721
    {
722
        $this->known_hosts = $args->read_shortstr();
723
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
724
    }
725
726
    /**
727
     * Asks the client to use a different server
728
     *
729
     * @param AMQPReader $args
730
     * @return string
731
     */
732
    protected function connection_redirect($args)
733
    {
734
        $host = $args->read_shortstr();
735
        $this->known_hosts = $args->read_shortstr();
736
        $this->debug->debug_msg(sprintf(
737
                'Redirected to [%s], known_hosts [%s]',
738
                $host,
739
                $this->known_hosts
740
            ));
741
742
        return $host;
743
    }
744
745
    /**
746
     * Security mechanism challenge
747
     *
748
     * @param AMQPReader $args
749
     */
750
    protected function connection_secure($args)
751
    {
752
        $challenge = $args->read_longstr();
753
    }
754
755
    /**
756
     * Security mechanism response
757
     *
758
     * @param string $response
759
     */
760
    protected function x_secure_ok($response)
761
    {
762
        $args = new AMQPWriter();
763
        $args->write_longstr($response);
764
        $this->send_method_frame(array(10, 21), $args);
765
    }
766
767 78
    /**
768
     * Starts connection negotiation
769 78
     *
770 78
     * @param AMQPReader $args
771 78
     */
772 78
    protected function connection_start($args)
773 78
    {
774
        $this->version_major = $args->read_octet();
775 78
        $this->version_minor = $args->read_octet();
776 78
        $this->server_properties = $args->read_table();
777 78
        $this->mechanisms = explode(' ', $args->read_longstr());
778 78
        $this->locales = explode(' ', $args->read_longstr());
779 78
780 78
        $this->debug->debug_connection_start(
781 65
            $this->version_major,
782 78
            $this->version_minor,
783
            $this->server_properties,
784
            $this->mechanisms,
785
            $this->locales
786
        );
787
    }
788
789
    /**
790 78
     * @param AMQPTable|array $clientProperties
791
     * @param string $mechanism
792 78
     * @param string $response
793 78
     * @param string $locale
794 78
     */
795 78
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
796 78
    {
797 78
        $args = new AMQPWriter();
798 78
        $args->write_table($clientProperties);
799
        $args->write_shortstr($mechanism);
800
        $args->write_longstr($response);
801
        $args->write_shortstr($locale);
802
        $this->send_method_frame(array(10, 11), $args);
803
    }
804
805 78
    /**
806
     * Proposes connection tuning parameters
807 78
     *
808 78
     * @param AMQPReader $args
809
     */
810
    protected function connection_tune($args)
811
    {
812 78
        $v = $args->read_short();
813 78
        if ($v) {
814 78
            $this->channel_max = $v;
815 65
        }
816
817
        $v = $args->read_long();
818 78
        if ($v) {
819
            $this->frame_max = $v;
820
        }
821
822 78
        // use server proposed value if not set
823 78
        if ($this->heartbeat === null) {
824
            $this->heartbeat = $args->read_short();
825
        }
826
827
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
828
    }
829
830
    /**
831
     * Negotiates connection tuning parameters
832 78
     *
833
     * @param int $channel_max
834 78
     * @param int $frame_max
835 78
     * @param int $heartbeat
836 78
     */
837 78
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
838 78
    {
839 78
        $args = new AMQPWriter();
840 78
        $args->write_short($channel_max);
841
        $args->write_long($frame_max);
842
        $args->write_short($heartbeat);
843
        $this->send_method_frame(array(10, 31), $args);
844
        $this->wait_tune_ok = false;
845
    }
846
847
    /**
848
     * @return SocketIO
849
     */
850
    public function getSocket()
851
    {
852
        return $this->io->getSocket();
853 60
    }
854
855 60
    /**
856
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
857
     */
858
    protected function getIO()
859
    {
860
        return $this->io;
861
    }
862
863
    /**
864
     * Handles connection blocked notifications
865
     *
866
     * @param AMQPReader $args
867
     */
868
    protected function connection_blocked(AMQPReader $args)
869
    {
870
        // Call the block handler and pass in the reason
871
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
872
    }
873
874
    /**
875
     * Handles connection unblocked notifications
876
     *
877
     * @param AMQPReader $args
878
     */
879
    protected function connection_unblocked(AMQPReader $args)
880
    {
881
        // No args to an unblock event
882
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
883
    }
884
885
    /**
886
     * Sets a handler which is called whenever a connection.block is sent from the server
887
     *
888
     * @param callable $callback
889
     */
890
    public function set_connection_block_handler($callback)
891
    {
892
        $this->connection_block_handler = $callback;
893
    }
894
895
    /**
896
     * Sets a handler which is called whenever a connection.block is sent from the server
897
     *
898
     * @param callable $callback
899
     */
900
    public function set_connection_unblock_handler($callback)
901
    {
902
        $this->connection_unblock_handler = $callback;
903
    }
904
905 84
    /**
906
     * Gets the connection status
907 84
     *
908
     * @return bool
909
     */
910
    public function isConnected()
911
    {
912
        return (bool) $this->is_connected;
913
    }
914
915 84
    /**
916
     * Set the connection status
917 84
     *
918 84
     * @param bool $is_connected
919
     */
920
    protected function setIsConnected($is_connected)
921
    {
922
        $this->is_connected = (bool) $is_connected;
923 84
    }
924
925 84
    /**
926
     * Closes all available channels
927 78
     */
928 78
    protected function closeChannels()
929
    {
930
        foreach ($this->channels as $key => $channel) {
931 36
            // channels[0] is this connection object, so don't close it yet
932 36
            if ($key === 0) {
933
                continue;
934
            }
935 70
            try {
936 84
                $channel->close();
937
            } catch (\Exception $e) {
938
                /* Ignore closing errors */
939
            }
940
        }
941
    }
942
943 60
    /**
944
     * Should the connection be attempted during construction?
945 60
     *
946
     * @return bool
947
     */
948
    public function connectOnConstruct()
949
    {
950
        return true;
951
    }
952
953
    /**
954
     * @return array
955
     */
956
    public function getServerProperties()
957
    {
958
        return $this->server_properties;
959
    }
960
}
961