Completed
Pull Request — master (#354)
by
unknown
22:18
created

AbstractConnection::connection_tune()   A

Complexity

Conditions 4
Paths 8

Size

Total Lines 19
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4

Importance

Changes 3
Bugs 2 Features 0
Metric Value
c 3
b 2
f 0
dl 0
loc 19
ccs 10
cts 10
cp 1
rs 9.2
cc 4
eloc 10
nc 8
nop 1
crap 4
1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AbstractChannel;
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPWriter;
11
use PhpAmqpLib\Wire\IO\AbstractIO;
12
use PhpAmqpLib\Wire\IO\SocketIO;
13
14
class AbstractConnection extends AbstractChannel
15
{
16
    /** @var array */
17
    public static $LIBRARY_PROPERTIES = array(
18
        'product' => array('S', 'AMQPLib'),
19
        'platform' => array('S', 'PHP'),
20
        'version' => array('S', '2.4'),
21
        'information' => array('S', ''),
22
        'copyright' => array('S', ''),
23
        'capabilities' => array(
24
            'F',
25
            array(
26
                'authentication_failure_close' => array('t', true),
27
                'publisher_confirms' => array('t', true),
28
                'consumer_cancel_notify' => array('t', true),
29
                'exchange_exchange_bindings' => array('t', true),
30
                'basic.nack' => array('t', true),
31
                'connection.blocked' => array('t', true)
32
            )
33
        )
34
    );
35
36
    /** @var AMQPChannel[] */
37
    public $channels = array();
38
39
    /** @var int */
40
    protected $version_major;
41
42
    /** @var int */
43
    protected $version_minor;
44
45
    /** @var array */
46
    protected $server_properties;
47
48
    /** @var array */
49
    protected $mechanisms;
50
51
    /** @var array */
52
    protected $locales;
53
54
    /** @var bool */
55
    protected $wait_tune_ok;
56
57
    /** @var string */
58
    protected $known_hosts;
59
60
    /** @var AMQPReader */
61
    protected $input;
62
63
    /** @var string */
64
    protected $vhost;
65
66
    /** @var bool */
67
    protected $insist;
68
69
    /** @var string */
70
    protected $login_method;
71
72
    /** @var AMQPWriter */
73
    protected $login_response;
74
75
    /** @var string */
76
    protected $locale;
77
78
    /** @var int */
79
    protected $heartbeat;
80
81
    /** @var float */
82
    protected $last_frame;
83
84
    /** @var SocketIO */
85
    protected $sock;
86
87
    /** @var int */
88
    protected $channel_max = 65535;
89
90
    /** @var int */
91
    protected $frame_max = 131072;
92
93
     /** @var array Constructor parameters for clone */
94
    protected $construct_params;
95
96
    /** @var bool Close the connection in destructor */
97
    protected $close_on_destruct = true;
98
99
    /** @var bool Maintain connection status */
100
    protected $is_connected = false;
101
102
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
103
    protected $io;
104
105
    /** @var \PhpAmqpLib\Wire\AMQPReader */
106
    protected $wait_frame_reader;
107
108
    /** @var callable Handles connection blocking from the server */
109
    private $connection_block_handler;
110
111
    /** @var callable Handles connection unblocking from the server */
112
    private $connection_unblock_handler;
113
114
    /**
115
     * Circular buffer to speed up prepare_content().
116
     * Max size limited by $prepare_content_cache_max_size.
117
     *
118
     * @var array
119
     * @see prepare_content()
120
     */
121
    private $prepare_content_cache;
122
123
    /** @var int Maximal size of $prepare_content_cache */
124
    private $prepare_content_cache_max_size;
125
126
    /**
127
     * @param string $user
128
     * @param string $password
129
     * @param string $vhost
130
     * @param bool $insist
131
     * @param string $login_method
132
     * @param null $login_response
133
     * @param string $locale
134
     * @param AbstractIO $io
135 60
     * @param int $heartbeat
136
     * @throws \Exception
137
     */
138
    public function __construct(
139
        $user,
140
        $password,
141
        $vhost = '/',
142
        $insist = false,
143
        $login_method = 'AMQPLAIN',
144
        $login_response = null,
145
        $locale = 'en_US',
146
        AbstractIO $io,
147 60
        $heartbeat = 0
148
    ) {
149 60
        // save the params for the use of __clone
150 60
        $this->construct_params = func_get_args();
151 60
152 60
        $this->wait_frame_reader = new AMQPReader(null);
153 60
        $this->vhost = $vhost;
154 60
        $this->insist = $insist;
155 60
        $this->login_method = $login_method;
156 60
        $this->login_response = $login_response;
157
        $this->locale = $locale;
158 60
        $this->io = $io;
159 60
        $this->heartbeat = $heartbeat;
160 60
161 60
        if ($user && $password) {
162 60
            $this->login_response = new AMQPWriter();
163 48
            $this->login_response->write_table(array(
164
                'LOGIN' => array('S', $user),
165
                'PASSWORD' => array('S', $password)
166 60
            ));
167 60
168
            // Skip the length
169 48
            $responseValue = $this->login_response->getvalue();
170
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
0 ignored issues
show
Documentation Bug introduced by
It seems like mb_substr($responseValue... 'ASCII') - 4, 'ASCII') of type string is incompatible with the declared type object<PhpAmqpLib\Wire\AMQPWriter> of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
171
172
        } else {
173 60
            $this->login_response = null;
174 60
        }
175
176
        $this->prepare_content_cache = array();
177 60
        $this->prepare_content_cache_max_size = 100;
178 50
179 40
        // Lazy Connection waits on connecting
180 60
        if ($this->connectOnConstruct()) {
181
            $this->connect();
182
        }
183
    }
184
185 60
    /**
186
     * Connects to the AMQP server
187
     */
188
    protected function connect()
189 60
    {
190
        try {
191 60
            // Loop until we connect
192
            while (!$this->isConnected()) {
193
                // Assume we will connect, until we dont
194 60
                $this->setIsConnected(true);
195
196 60
                // Connect the socket
197
                $this->getIO()->connect();
198 60
199
                $this->channels = array();
200 60
                // The connection object itself is treated as channel 0
201
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
202 60
203 60
                $this->input = new AMQPReader(null, $this->getIO());
204 60
205
                $this->write($this->amqp_protocol_header);
206 60
                $this->wait(array($this->waitHelper->get_wait('connection.start')));
207 60
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
208 60
209 60
                $this->wait_tune_ok = true;
210 60
                while ($this->wait_tune_ok) {
211 48
                    $this->wait(array(
212 48
                        $this->waitHelper->get_wait('connection.secure'),
213
                        $this->waitHelper->get_wait('connection.tune')
214 60
                    ));
215 60
                }
216 60
217
                $host = $this->x_open($this->vhost, '', $this->insist);
218
                if (!$host) {
219
                    return null; // we weren't redirected
220
                }
221
222
                $this->setIsConnected(false);
223
                $this->closeChannels();
224
225
                // we were redirected, close the socket, loop and try again
226 8
                $this->close_socket();
227
            }
228
229
        } catch (\Exception $e) {
230
            // Something went wrong, set the connection status
231
            $this->setIsConnected(false);
232 10
            $this->closeChannels();
233
            throw $e; // Rethrow exception
234
        }
235
    }
236
237
    /**
238 20
     * Reconnects using the original connection settings.
239
     * This will not recreate any channels that were established previously
240
     */
241 20
    public function reconnect()
242
    {
243 20
        // Try to close the AMQP connection
244 20
        $this->safeClose();
245 20
        // Reconnect the socket/stream then AMQP
246 20
        $this->getIO()->reconnect();
247
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
248
        $this->connect();
249
    }
250
251
    /**
252
     * Cloning will use the old properties to make a new connection to the same server
253
     */
254
    public function __clone()
255
    {
256
        call_user_func_array(array($this, '__construct'), $this->construct_params);
257
    }
258
259
    public function __destruct()
260
    {
261
        if ($this->close_on_destruct) {
262
            $this->safeClose();
263
        }
264
    }
265
266 20
    /**
267
     * Attempts to close the connection safely
268
     */
269 20
    protected function safeClose()
270 12
    {
271 8
        try {
272 16
            if (isset($this->input) && $this->input) {
273
                $this->close();
274
            }
275 20
        } catch (\Exception $e) {
276
            // Nothing here
277
        }
278
    }
279
280
    /**
281
     * @param int $sec
282
     * @param int $usec
283
     * @return mixed
284
     */
285
    public function select($sec, $usec = 0)
286
    {
287
        return $this->getIO()->select($sec, $usec);
288
    }
289
290
    /**
291
     * Allows to not close the connection
292
     * it's useful after the fork when you don't want to close parent process connection
293
     *
294
     * @param bool $close
295
     */
296
    public function set_close_on_destruct($close = true)
297
    {
298 60
        $this->close_on_destruct = (bool) $close;
299
    }
300 60
301
    protected function close_input()
302 60
    {
303 60
        $this->debug->debug_msg('closing input');
304 60
305 48
        if (!is_null($this->input)) {
306 60
            $this->input->close();
307
            $this->input = null;
308 60
        }
309
    }
310 60
311
    protected function close_socket()
312 60
    {
313 60
        $this->debug->debug_msg('closing socket');
314 48
315 60
        if (!is_null($this->getIO())) {
316
            $this->getIO()->close();
317
        }
318
    }
319
320 60
    /**
321
     * @param $data
322 60
     */
323
    public function write($data)
324
    {
325 60
        $this->debug->debug_hexdump($data);
326 48
327
        try {
328
            $this->getIO()->write($data);
329
        } catch (AMQPRuntimeException $e) {
330 60
            $this->setIsConnected(false);
331
            throw $e;
332 60
        }
333
    }
334 60
335 60
    protected function do_close()
336 60
    {
337 60
        $this->setIsConnected(false);
338
        $this->close_input();
339
        $this->close_socket();
340
    }
341
342
    /**
343 60
     * @return int
344
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
345 60
     */
346 60
    public function get_free_channel_id()
347 60
    {
348
        for ($i = 1; $i <= $this->channel_max; $i++) {
349 12
            if (!isset($this->channels[$i])) {
350
                return $i;
351
            }
352
        }
353
354
        throw new AMQPRuntimeException('No free channel ids');
355
    }
356
357
    /**
358
     * @param string $channel
359
     * @param int $class_id
360
     * @param int $weight
361
     * @param int $body_size
362
     * @param string $packed_properties
363 55
     * @param string $body
364
     * @param AMQPWriter $pkt
365 55
     */
366 55
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
367 55
    {
368
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
369
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
370
    }
371
372
    /**
373
     * Returns a new AMQPWriter or mutates the provided $pkt
374
     *
375
     * @param string $channel
376
     * @param int $class_id
377
     * @param int $weight
378
     * @param int $body_size
379
     * @param string $packed_properties
380
     * @param string $body
381 55
     * @param AMQPWriter $pkt
382
     * @return AMQPWriter
383 55
     */
384
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
385
    {
386 55
        $pkt = $pkt ?: new AMQPWriter();
387 55
388 44
        // Content already prepared ?
389 44
        $key_cache = sprintf(
390 44
            '%s|%s|%s|%s',
391
            $channel,
392 44
            $packed_properties,
393
            $class_id,
394 55
            $weight
395 55
        );
396 55
397 55
        if (!isset($this->prepare_content_cache[$key_cache])) {
398 55
            $w = new AMQPWriter();
399 55
            $w->write_octet(2);
400 55
            $w->write_short($channel);
401 55
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
402 55
            $w->write_short($class_id);
403
            $w->write_short($weight);
404
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
405
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
406
                reset($this->prepare_content_cache);
407 44
                $old_key = key($this->prepare_content_cache);
408 55
                unset($this->prepare_content_cache[$old_key]);
409
            }
410 55
        }
411 55
        $pkt->write($this->prepare_content_cache[$key_cache]);
412
413 55
        $pkt->write_longlong($body_size);
414
        $pkt->write($packed_properties);
415
416
        $pkt->write_octet(0xCE);
417
418
419 55
        // memory efficiency: walk the string instead of biting
420 55
        // it. good for very large packets (close in size to
421 55
        // memory_limit setting)
422 50
        $position = 0;
423 50
        $bodyLength = mb_strlen($body,'ASCII');
424
        while ($position < $bodyLength) {
425 50
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
426 50
            $position += $this->frame_max - 8;
427 50
428
            $pkt->write_octet(3);
429 50
            $pkt->write_short($channel);
430
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
431 50
432 40
            $pkt->write($payload);
433
434 55
            $pkt->write_octet(0xCE);
435
        }
436
437
        return $pkt;
438
    }
439
440
    /**
441
     * @param $channel
442
     * @param $method_sig
443 60
     * @param string $args
444
     * @param null $pkt
445 60
     */
446 60
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
447 60
    {
448 60
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
449
        $this->write($pkt->getvalue());
450
        $this->debug->debug_method_signature1($method_sig);
451
    }
452
453
    /**
454
     * Returns a new AMQPWriter or mutates the provided $pkt
455
     *
456
     * @param $channel
457
     * @param $method_sig
458
     * @param string $args
459 60
     * @param AMQPWriter $pkt
460
     * @return null|AMQPWriter
461 60
     */
462 60
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
463 48
    {
464
        if ($args instanceof AMQPWriter) {
465 60
            $args = $args->getvalue();
466
        }
467 60
468 60
        $pkt = $pkt ?: new AMQPWriter();
469 60
470
        $pkt->write_octet(1);
471
        $pkt->write_short($channel);
472 60
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
473 60
        // in payload
474 60
475
        $pkt->write_short($method_sig[0]); // class_id
476 60
        $pkt->write_short($method_sig[1]); // method_id
477
        $pkt->write($args);
478 60
479
        $pkt->write_octet(0xCE);
480 60
481
        $this->debug->debug_method_signature1($method_sig);
482
483
        return $pkt;
484
    }
485
486
    /**
487
     * Waits for a frame from the server
488
     *
489
     * @param int $timeout
490
     * @return array
491
     * @throws \Exception
492 60
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
493
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
494 60
     */
495 48
    protected function wait_frame($timeout = 0)
496
    {
497
        if (is_null($this->input))
498
        {
499
            $this->setIsConnected(false);
500 60
            throw new AMQPRuntimeException('Broken pipe or closed connection');
501 60
        }
502
503
        $currentTimeout = $this->input->getTimeout();
504
        $this->input->setTimeout($timeout);
505 60
506 60
        try {
507 48
            // frame_type + channel_id + size
508
            $this->wait_frame_reader->reuse(
509 60
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
510 60
            );
511 60
512
            $frame_type = $this->wait_frame_reader->read_octet();
513
            $channel = $this->wait_frame_reader->read_short();
514 60
            $size = $this->wait_frame_reader->read_long();
515
516 60
            // payload + ch
517 60
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
518
519 48
            $payload = $this->wait_frame_reader->read($size);
520
            $ch = $this->wait_frame_reader->read_octet();
521
522
        } catch (AMQPTimeoutException $e) {
523
            $this->input->setTimeout($currentTimeout);
524 60
            throw $e;
525
        }
526 60
527
        $this->input->setTimeout($currentTimeout);
528
529
        if ($ch != 0xCE) {
530
            throw new AMQPRuntimeException(sprintf(
531
                'Framing error, unexpected byte: %x',
532
                $ch
533 60
            ));
534
        }
535
536
        return array($frame_type, $channel, $payload);
537
    }
538
539
    /**
540
     * Waits for a frame from the server destined for a particular channel.
541
     *
542
     * @param string $channel_id
543 60
     * @param int $timeout
544
     * @return array
545
     */
546 60
    protected function wait_channel($channel_id, $timeout = 0)
547 60
    {
548 60
        // Keeping the original timeout unchanged.
549 60
        $_timeout = $timeout;
550
        while (true) {
551 60
            $now = time();
552
            try {
553
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
554
            }
555
            catch ( AMQPTimeoutException $e ) {
556
                if ( $this->heartbeat && microtime(true) - ($this->heartbeat*2) > $this->last_frame ) {
557
                    $this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
558
                    $this->setIsConnected(false);
559
                    throw new AMQPRuntimeException("Missed server heartbeat");
560
                }
561
562
                throw $e;
563
            }
564 60
565 60
            $this->last_frame = microtime(true);
566
567
            if ($frame_channel === 0 && $frame_type === 8) {
568
                // skip heartbeat frames and reduce the timeout by the time passed
569
                $this->debug->debug_msg("received server heartbeat");
570
                if($_timeout > 0) {
571
                    $_timeout -= time() - $now;
572 5
                    if($_timeout <= 0) {
573 5
                        // If timeout has been reached, throw the exception without calling wait_frame
574 4
                        throw new AMQPTimeoutException("Timeout waiting on channel");
575
                    }
576
                }
577
                continue;
578
579 5
            } else {
580
581
                if ($frame_channel == $channel_id) {
582
                    return array($frame_type, $payload);
583 4
                }
584
585
                // Not the channel we were looking for.  Queue this frame
586
                //for later, when the other channel is looking for frames.
587
                // Make sure the channel still exists, it could have been
588
                // closed by a previous Exception.
589
                if (isset($this->channels[$frame_channel])) {
590
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
591
                }
592
593 60
                // If we just queued up a method for channel 0 (the Connection
594
                // itself) it's probably a close method in reaction to some
595 60
                // error, so deal with it right away.
596
                if (($frame_type == 1) && ($frame_channel == 0)) {
597
                    $this->wait();
598
                }
599
            }
600 60
        }
601 60
    }
602 60
603
    /**
604 60
     * Fetches a channel object identified by the numeric channel_id, or
605
     * create that object if it doesn't already exist.
606
     *
607
     * @param string $channel_id
608
     * @return AMQPChannel
609
     */
610
    public function channel($channel_id = null)
611
    {
612
        if (isset($this->channels[$channel_id])) {
613
614
            return $this->channels[$channel_id];
615 60
        }
616
617 60
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
618
        $ch = new AMQPChannel($this->connection, $channel_id);
619
        $this->channels[$channel_id] = $ch;
620
621 60
        return $ch;
622
    }
623 60
624 48
    /**
625 48
     * Requests a connection close
626 60
     *
627 60
     * @param int $reply_code
628 48
     * @param string $reply_text
629 60
     * @param array $method_sig
630
     * @return mixed|null
631 60
     */
632
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
633 60
    {
634 60
        if (!$this->protocolWriter || !$this->isConnected()) {
635 48
            return null;
636
        }
637
638
        $this->closeChannels();
639
640
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
641
            $reply_code,
642
            $reply_text,
643
            $method_sig[0],
644
            $method_sig[1]
645
        );
646
        $this->send_method_frame(array($class_id, $method_id), $args);
647
648
        $this->setIsConnected(false);
649
650
        return $this->wait(array(
651
            $this->waitHelper->get_wait('connection.close_ok')
652
        ));
653
    }
654
655
    /**
656
     * @param AMQPReader $args
657
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
658
     */
659
    protected function connection_close($args)
660
    {
661
        $reply_code = $args->read_short();
662
        $reply_text = $args->read_shortstr();
663
        $class_id = $args->read_short();
664
        $method_id = $args->read_short();
665
666
        $this->x_close_ok();
667
668 60
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
0 ignored issues
show
Documentation introduced by
array($class_id, $method_id) is of type array<integer,*,{"0":"*","1":"*"}>, but the function expects a object<Exception>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
669
    }
670 60
671 60
    /**
672
     * Confirms a connection close
673
     */
674
    protected function x_close_ok()
675
    {
676
        $this->send_method_frame(
677
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
678
        );
679 60
        $this->do_close();
680
    }
681 60
682 60
    /**
683 60
     * Confirm a connection close
684 60
     */
685 60
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
686
    {
687
        $this->do_close();
688 60
    }
689 48
690
    /**
691 60
     * @param string $virtual_host
692
     * @param string $capabilities
693
     * @param bool $insist
694
     * @return mixed
695 60
     */
696
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
697
    {
698
        $args = new AMQPWriter();
699
        $args->write_shortstr($virtual_host);
700
        $args->write_shortstr($capabilities);
701
        $args->write_bits(array($insist));
702
        $this->send_method_frame(array(10, 40), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
703 60
704
        $wait = array(
705 60
            $this->waitHelper->get_wait('connection.open_ok')
706 60
        );
707 60
708
        if ($this->protocolVersion == '0.8') {
709
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
710
        }
711
712
        return $this->wait($wait);
713
    }
714
715
    /**
716
     * Signals that the connection is ready
717
     *
718
     * @param AMQPReader $args
719
     */
720
    protected function connection_open_ok($args)
721
    {
722
        $this->known_hosts = $args->read_shortstr();
723
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
724
    }
725
726
    /**
727
     * Asks the client to use a different server
728
     *
729
     * @param AMQPReader $args
730
     * @return string
731
     */
732
    protected function connection_redirect($args)
733
    {
734
        $host = $args->read_shortstr();
735
        $this->known_hosts = $args->read_shortstr();
736
        $this->debug->debug_msg(sprintf(
737
                'Redirected to [%s], known_hosts [%s]',
738
                $host,
739
                $this->known_hosts
740
            ));
741
742
        return $host;
743
    }
744
745
    /**
746
     * Security mechanism challenge
747
     *
748
     * @param AMQPReader $args
749
     */
750
    protected function connection_secure($args)
751
    {
752
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
753 60
    }
754
755 60
    /**
756 60
     * Security mechanism response
757 60
     */
758 60
    protected function x_secure_ok($response)
759 60
    {
760
        $args = new AMQPWriter();
761 60
        $args->write_longstr($response);
762 60
        $this->send_method_frame(array(10, 21), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
763 60
    }
764 60
765 60
    /**
766 60
     * Starts connection negotiation
767 48
     *
768 60
     * @param AMQPReader $args
769
     */
770
    protected function connection_start($args)
771
    {
772
        $this->version_major = $args->read_octet();
773
        $this->version_minor = $args->read_octet();
774
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
775
        $this->mechanisms = explode(' ', $args->read_longstr());
776 60
        $this->locales = explode(' ', $args->read_longstr());
777
778 60
        $this->debug->debug_connection_start(
779 60
            $this->version_major,
780 60
            $this->version_minor,
781 60
            $this->server_properties,
782 60
            $this->mechanisms,
783 60
            $this->locales
784 60
        );
785
    }
786
787
    /**
788
     * @param $client_properties
789
     * @param $mechanism
790
     * @param $response
791 60
     * @param $locale
792
     */
793 60
    protected function x_start_ok($client_properties, $mechanism, $response, $locale)
794 60
    {
795
        $args = new AMQPWriter();
796
        $args->write_table($client_properties);
797
        $args->write_shortstr($mechanism);
798 60
        $args->write_longstr($response);
799 60
        $args->write_shortstr($locale);
800 60
        $this->send_method_frame(array(10, 11), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
801 48
    }
802
803
    /**
804 60
     * Proposes connection tuning parameters
805
     *
806
     * @param AMQPReader $args
807
     */
808 60
    protected function connection_tune($args)
809 60
    {
810
        $v = $args->read_short();
811
        if ($v) {
812
            $this->channel_max = $v;
813
        }
814
815
        $v = $args->read_long();
816
        if ($v) {
817
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
818 60
        }
819
820 60
        // use server proposed value if not set
821 60
        if ($this->heartbeat === null) {
822 60
            $this->heartbeat = $args->read_short();
823 60
        }
824 60
825 60
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
826 60
    }
827
828
    /**
829
     * Negotiates connection tuning parameters
830
     *
831
     * @param $channel_max
832
     * @param $frame_max
833
     * @param $heartbeat
834
     */
835
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
836
    {
837
        $args = new AMQPWriter();
838
        $args->write_short($channel_max);
839 50
        $args->write_long($frame_max);
840
        $args->write_short($heartbeat);
841 50
        $this->send_method_frame(array(10, 31), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
842
        $this->wait_tune_ok = false;
843
    }
844
845
    /**
846
     * @return SocketIO
847
     */
848
    public function getSocket()
849
    {
850
        return $this->io->getSocket();
851
    }
852
853
    /**
854
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
855
     */
856
    protected function getIO()
857
    {
858
        return $this->io;
859
    }
860
861
    /**
862
     * Handles connection blocked notifications
863
     *
864
     * @param AMQPReader $args
865
     */
866
    protected function connection_blocked(AMQPReader $args)
867
    {
868
        // Call the block handler and pass in the reason
869
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
870
    }
871
872
    /**
873
     * Handles connection unblocked notifications
874
     */
875
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
876
    {
877
        // No args to an unblock event
878
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
879
    }
880
881
    /**
882
     * Sets a handler which is called whenever a connection.block is sent from the server
883
     *
884
     * @param callable $callback
885
     */
886
    public function set_connection_block_handler($callback)
887
    {
888
        $this->connection_block_handler = $callback;
889 60
    }
890
891 60
    /**
892
     * Sets a handler which is called whenever a connection.block is sent from the server
893
     *
894
     * @param callable $callback
895
     */
896
    public function set_connection_unblock_handler($callback)
897
    {
898
        $this->connection_unblock_handler = $callback;
899 60
    }
900
901 60
    /**
902 60
     * Gets the connection status
903
     *
904
     * @return bool
905
     */
906
    public function isConnected()
907 60
    {
908
        return $this->is_connected;
909 60
    }
910
911 60
    /**
912 60
     * Set the connection status
913
     *
914
     * @param bool $is_connected
915 25
     */
916 25
    protected function setIsConnected($is_connected)
917
    {
918
        $this->is_connected = (bool) $is_connected;
919 48
    }
920 60
921
    /**
922
     * Closes all available channels
923
     */
924
    protected function closeChannels()
925
    {
926
        foreach ($this->channels as $key => $channel) {
927 50
            // channels[0] is this connection object, so don't close it yet
928
            if ($key === 0) {
929 50
                continue;
930
            }
931
            try {
932 4
                $channel->close();
933
            } catch (\Exception $e) {
934
                /* Ignore closing errors */
935
            }
936
        }
937
    }
938
939
    /**
940
     * Should the connection be attempted during construction?
941
     *
942
     * @return bool
943
     */
944
    public function connectOnConstruct()
945
    {
946
        return true;
947
    }
948
}
949