AbstractConnection::close_input()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 3
Bugs 1 Features 0
Metric Value
c 3
b 1
f 0
dl 0
loc 9
ccs 7
cts 7
cp 1
rs 9.6666
cc 2
eloc 5
nc 2
nop 0
crap 2
1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AbstractChannel;
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPWriter;
11
use PhpAmqpLib\Wire\IO\AbstractIO;
12
use PhpAmqpLib\Wire\IO\SocketIO;
13
14
class AbstractConnection extends AbstractChannel
15
{
16
    /** @var array */
17
    public static $LIBRARY_PROPERTIES = array(
18
        'product' => array('S', 'AMQPLib'),
19
        'platform' => array('S', 'PHP'),
20
        'version' => array('S', '2.4'),
21
        'information' => array('S', ''),
22
        'copyright' => array('S', ''),
23
        'capabilities' => array(
24
            'F',
25
            array(
26
                'authentication_failure_close' => array('t', true),
27
                'publisher_confirms' => array('t', true),
28
                'consumer_cancel_notify' => array('t', true),
29
                'exchange_exchange_bindings' => array('t', true),
30
                'basic.nack' => array('t', true),
31
                'connection.blocked' => array('t', true)
32
            )
33
        )
34
    );
35
36
    /** @var AMQPChannel[] */
37
    public $channels = array();
38
39
    /** @var int */
40
    protected $version_major;
41
42
    /** @var int */
43
    protected $version_minor;
44
45
    /** @var array */
46
    protected $server_properties;
47
48
    /** @var array */
49
    protected $mechanisms;
50
51
    /** @var array */
52
    protected $locales;
53
54
    /** @var bool */
55
    protected $wait_tune_ok;
56
57
    /** @var string */
58
    protected $known_hosts;
59
60
    /** @var AMQPReader */
61
    protected $input;
62
63
    /** @var string */
64
    protected $vhost;
65
66
    /** @var bool */
67
    protected $insist;
68
69
    /** @var string */
70
    protected $login_method;
71
72
    /** @var AMQPWriter */
73
    protected $login_response;
74
75
    /** @var string */
76
    protected $locale;
77
78
    /** @var int */
79
    protected $heartbeat;
80
81
    /** @var SocketIO */
82
    protected $sock;
83
84
    /** @var int */
85
    protected $channel_max = 65535;
86
87
    /** @var int */
88
    protected $frame_max = 131072;
89
90
     /** @var array Constructor parameters for clone */
91
    protected $construct_params;
92
93
    /** @var bool Close the connection in destructor */
94
    protected $close_on_destruct = true;
95
96
    /** @var bool Maintain connection status */
97
    protected $is_connected = false;
98
99
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
100
    protected $io;
101
102
    /** @var \PhpAmqpLib\Wire\AMQPReader */
103
    protected $wait_frame_reader;
104
105
    /** @var callable Handles connection blocking from the server */
106
    private $connection_block_handler;
107
108
    /** @var callable Handles connection unblocking from the server */
109
    private $connection_unblock_handler;
110
111
    /**
112
     * Circular buffer to speed up prepare_content().
113
     * Max size limited by $prepare_content_cache_max_size.
114
     *
115
     * @var array
116
     * @see prepare_content()
117
     */
118
    private $prepare_content_cache;
119
120
    /** @var int Maximal size of $prepare_content_cache */
121
    private $prepare_content_cache_max_size;
122
123
    /**
124
     * @param string $user
125
     * @param string $password
126
     * @param string $vhost
127
     * @param bool $insist
128
     * @param string $login_method
129
     * @param null $login_response
130
     * @param string $locale
131
     * @param AbstractIO $io
132
     * @param int $heartbeat
133
     * @throws \Exception
134
     */
135 60
    public function __construct(
136
        $user,
137
        $password,
138
        $vhost = '/',
139
        $insist = false,
140
        $login_method = 'AMQPLAIN',
141
        $login_response = null,
142
        $locale = 'en_US',
143
        AbstractIO $io,
144
        $heartbeat = 0
145
    ) {
146
        // save the params for the use of __clone
147 60
        $this->construct_params = func_get_args();
148
149 60
        $this->wait_frame_reader = new AMQPReader(null);
150 60
        $this->vhost = $vhost;
151 60
        $this->insist = $insist;
152 60
        $this->login_method = $login_method;
153 60
        $this->login_response = $login_response;
154 60
        $this->locale = $locale;
155 60
        $this->io = $io;
156 60
        $this->heartbeat = $heartbeat;
157
158 60
        if ($user && $password) {
159 60
            $this->login_response = new AMQPWriter();
160 60
            $this->login_response->write_table(array(
161 60
                'LOGIN' => array('S', $user),
162 60
                'PASSWORD' => array('S', $password)
163 48
            ));
164
165
            // Skip the length
166 60
            $responseValue = $this->login_response->getvalue();
167 60
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
0 ignored issues
show
Documentation Bug introduced by
It seems like mb_substr($responseValue... 'ASCII') - 4, 'ASCII') of type string is incompatible with the declared type object<PhpAmqpLib\Wire\AMQPWriter> of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
168
169 48
        } else {
170
            $this->login_response = null;
171
        }
172
173 60
        $this->prepare_content_cache = array();
174 60
        $this->prepare_content_cache_max_size = 100;
175
176
        // Lazy Connection waits on connecting
177 60
        if ($this->connectOnConstruct()) {
178 50
            $this->connect();
179 40
        }
180 60
    }
181
182
    /**
183
     * Connects to the AMQP server
184
     */
185 60
    protected function connect()
186
    {
187
        try {
188
            // Loop until we connect
189 60
            while (!$this->isConnected()) {
190
                // Assume we will connect, until we dont
191 60
                $this->setIsConnected(true);
192
193
                // Connect the socket
194 60
                $this->getIO()->connect();
195
196 60
                $this->channels = array();
197
                // The connection object itself is treated as channel 0
198 60
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
199
200 60
                $this->input = new AMQPReader(null, $this->getIO());
201
202 60
                $this->write($this->amqp_protocol_header);
203 60
                $this->wait(array($this->waitHelper->get_wait('connection.start')));
204 60
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
205
206 60
                $this->wait_tune_ok = true;
207 60
                while ($this->wait_tune_ok) {
208 60
                    $this->wait(array(
209 60
                        $this->waitHelper->get_wait('connection.secure'),
210 60
                        $this->waitHelper->get_wait('connection.tune')
211 48
                    ));
212 48
                }
213
214 60
                $host = $this->x_open($this->vhost, '', $this->insist);
215 60
                if (!$host) {
216 60
                    return null; // we weren't redirected
217
                }
218
219
                $this->setIsConnected(false);
220
                $this->closeChannels();
221
222
                // we were redirected, close the socket, loop and try again
223
                $this->close_socket();
224
            }
225
226 8
        } catch (\Exception $e) {
227
            // Something went wrong, set the connection status
228
            $this->setIsConnected(false);
229
            $this->closeChannels();
230
            throw $e; // Rethrow exception
231
        }
232 10
    }
233
234
    /**
235
     * Reconnects using the original connection settings.
236
     * This will not recreate any channels that were established previously
237
     */
238 20
    public function reconnect()
239
    {
240
        // Try to close the AMQP connection
241 20
        $this->safeClose();
242
        // Reconnect the socket/stream then AMQP
243 20
        $this->getIO()->reconnect();
244 20
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
245 20
        $this->connect();
246 20
    }
247
248
    /**
249
     * Cloning will use the old properties to make a new connection to the same server
250
     */
251
    public function __clone()
252
    {
253
        call_user_func_array(array($this, '__construct'), $this->construct_params);
254
    }
255
256
    public function __destruct()
257
    {
258
        if ($this->close_on_destruct) {
259
            $this->safeClose();
260
        }
261
    }
262
263
    /**
264
     * Attempts to close the connection safely
265
     */
266 20
    protected function safeClose()
267
    {
268
        try {
269 20
            if (isset($this->input) && $this->input) {
270 12
                $this->close();
271 8
            }
272 16
        } catch (\Exception $e) {
273
            // Nothing here
274
        }
275 20
    }
276
277
    /**
278
     * @param int $sec
279
     * @param int $usec
280
     * @return mixed
281
     */
282
    public function select($sec, $usec = 0)
283
    {
284
        return $this->getIO()->select($sec, $usec);
285
    }
286
287
    /**
288
     * Allows to not close the connection
289
     * it's useful after the fork when you don't want to close parent process connection
290
     *
291
     * @param bool $close
292
     */
293
    public function set_close_on_destruct($close = true)
294
    {
295
        $this->close_on_destruct = (bool) $close;
296
    }
297
298 60
    protected function close_input()
299
    {
300 60
        $this->debug->debug_msg('closing input');
301
302 60
        if (!is_null($this->input)) {
303 60
            $this->input->close();
304 60
            $this->input = null;
305 48
        }
306 60
    }
307
308 60
    protected function close_socket()
309
    {
310 60
        $this->debug->debug_msg('closing socket');
311
312 60
        if (!is_null($this->getIO())) {
313 60
            $this->getIO()->close();
314 48
        }
315 60
    }
316
317
    /**
318
     * @param $data
319
     */
320 60
    public function write($data)
321
    {
322 60
        $this->debug->debug_hexdump($data);
323
324
        try {
325 60
            $this->getIO()->write($data);
326 48
        } catch (AMQPRuntimeException $e) {
327
            $this->setIsConnected(false);
328
            throw $e;
329
        }
330 60
    }
331
332 60
    protected function do_close()
333
    {
334 60
        $this->setIsConnected(false);
335 60
        $this->close_input();
336 60
        $this->close_socket();
337 60
    }
338
339
    /**
340
     * @return int
341
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
342
     */
343 60
    public function get_free_channel_id()
344
    {
345 60
        for ($i = 1; $i <= $this->channel_max; $i++) {
346 60
            if (!isset($this->channels[$i])) {
347 60
                return $i;
348
            }
349 12
        }
350
351
        throw new AMQPRuntimeException('No free channel ids');
352
    }
353
354
    /**
355
     * @param string $channel
356
     * @param int $class_id
357
     * @param int $weight
358
     * @param int $body_size
359
     * @param string $packed_properties
360
     * @param string $body
361
     * @param AMQPWriter $pkt
362
     */
363 55
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
364
    {
365 55
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
366 55
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
367 55
    }
368
369
    /**
370
     * Returns a new AMQPWriter or mutates the provided $pkt
371
     *
372
     * @param string $channel
373
     * @param int $class_id
374
     * @param int $weight
375
     * @param int $body_size
376
     * @param string $packed_properties
377
     * @param string $body
378
     * @param AMQPWriter $pkt
379
     * @return AMQPWriter
380
     */
381 55
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
382
    {
383 55
        $pkt = $pkt ?: new AMQPWriter();
384
385
        // Content already prepared ?
386 55
        $key_cache = sprintf(
387 55
            '%s|%s|%s|%s',
388 44
            $channel,
389 44
            $packed_properties,
390 44
            $class_id,
391
            $weight
392 44
        );
393
394 55
        if (!isset($this->prepare_content_cache[$key_cache])) {
395 55
            $w = new AMQPWriter();
396 55
            $w->write_octet(2);
397 55
            $w->write_short($channel);
398 55
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
399 55
            $w->write_short($class_id);
400 55
            $w->write_short($weight);
401 55
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
402 55
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
403
                reset($this->prepare_content_cache);
404
                $old_key = key($this->prepare_content_cache);
405
                unset($this->prepare_content_cache[$old_key]);
406
            }
407 44
        }
408 55
        $pkt->write($this->prepare_content_cache[$key_cache]);
409
410 55
        $pkt->write_longlong($body_size);
411 55
        $pkt->write($packed_properties);
412
413 55
        $pkt->write_octet(0xCE);
414
415
416
        // memory efficiency: walk the string instead of biting
417
        // it. good for very large packets (close in size to
418
        // memory_limit setting)
419 55
        $position = 0;
420 55
        $bodyLength = mb_strlen($body,'ASCII');
421 55
        while ($position < $bodyLength) {
422 50
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
423 50
            $position += $this->frame_max - 8;
424
425 50
            $pkt->write_octet(3);
426 50
            $pkt->write_short($channel);
427 50
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
428
429 50
            $pkt->write($payload);
430
431 50
            $pkt->write_octet(0xCE);
432 40
        }
433
434 55
        return $pkt;
435
    }
436
437
    /**
438
     * @param $channel
439
     * @param $method_sig
440
     * @param string $args
441
     * @param null $pkt
442
     */
443 60
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
444
    {
445 60
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
446 60
        $this->write($pkt->getvalue());
447 60
        $this->debug->debug_method_signature1($method_sig);
448 60
    }
449
450
    /**
451
     * Returns a new AMQPWriter or mutates the provided $pkt
452
     *
453
     * @param $channel
454
     * @param $method_sig
455
     * @param string $args
456
     * @param AMQPWriter $pkt
457
     * @return null|AMQPWriter
458
     */
459 60
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
460
    {
461 60
        if ($args instanceof AMQPWriter) {
462 60
            $args = $args->getvalue();
463 48
        }
464
465 60
        $pkt = $pkt ?: new AMQPWriter();
466
467 60
        $pkt->write_octet(1);
468 60
        $pkt->write_short($channel);
469 60
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
470
        // in payload
471
472 60
        $pkt->write_short($method_sig[0]); // class_id
473 60
        $pkt->write_short($method_sig[1]); // method_id
474 60
        $pkt->write($args);
475
476 60
        $pkt->write_octet(0xCE);
477
478 60
        $this->debug->debug_method_signature1($method_sig);
479
480 60
        return $pkt;
481
    }
482
483
    /**
484
     * Waits for a frame from the server
485
     *
486
     * @param int $timeout
487
     * @return array
488
     * @throws \Exception
489
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
490
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
491
     */
492 60
    protected function wait_frame($timeout = 0)
493
    {
494 60
        if (is_null($this->input))
495 48
        {
496
            $this->setIsConnected(false);
497
            throw new AMQPRuntimeException('Broken pipe or closed connection');
498
        }
499
500 60
        $currentTimeout = $this->input->getTimeout();
501 60
        $this->input->setTimeout($timeout);
502
503
        try {
504
            // frame_type + channel_id + size
505 60
            $this->wait_frame_reader->reuse(
506 60
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
507 48
            );
508
509 60
            $frame_type = $this->wait_frame_reader->read_octet();
510 60
            $channel = $this->wait_frame_reader->read_short();
511 60
            $size = $this->wait_frame_reader->read_long();
512
513
            // payload + ch
514 60
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
515
516 60
            $payload = $this->wait_frame_reader->read($size);
517 60
            $ch = $this->wait_frame_reader->read_octet();
518
519 48
        } catch (AMQPTimeoutException $e) {
520
            $this->input->setTimeout($currentTimeout);
521
            throw $e;
522
        }
523
524 60
        $this->input->setTimeout($currentTimeout);
525
526 60
        if ($ch != 0xCE) {
527
            throw new AMQPRuntimeException(sprintf(
528
                'Framing error, unexpected byte: %x',
529
                $ch
530
            ));
531
        }
532
533 60
        return array($frame_type, $channel, $payload);
534
    }
535
536
    /**
537
     * Waits for a frame from the server destined for a particular channel.
538
     *
539
     * @param string $channel_id
540
     * @param int $timeout
541
     * @return array
542
     */
543 60
    protected function wait_channel($channel_id, $timeout = 0)
544
    {
545
        // Keeping the original timeout unchanged.
546 60
        $_timeout = $timeout;
547 60
        while (true) {
548 60
            $now = time();
549 60
            list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
550
551 60
            if ($frame_channel === 0 && $frame_type === 8) {
552
                // skip heartbeat frames and reduce the timeout by the time passed
553
                if($_timeout > 0) {
554
                    $_timeout -= time() - $now;
555
                    if($_timeout <= 0) {
556
                        // If timeout has been reached, throw the exception without calling wait_frame
557
                        throw new AMQPTimeoutException("Timeout waiting on channel");
558
                    }
559
                }
560
                continue;
561
562
            } else {
563
564 60
                if ($frame_channel == $channel_id) {
565 60
                    return array($frame_type, $payload);
566
                }
567
568
                // Not the channel we were looking for.  Queue this frame
569
                //for later, when the other channel is looking for frames.
570
                // Make sure the channel still exists, it could have been
571
                // closed by a previous Exception.
572 5
                if (isset($this->channels[$frame_channel])) {
573 5
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
574 4
                }
575
576
                // If we just queued up a method for channel 0 (the Connection
577
                // itself) it's probably a close method in reaction to some
578
                // error, so deal with it right away.
579 5
                if (($frame_type == 1) && ($frame_channel == 0)) {
580
                    $this->wait();
581
                }
582
            }
583 4
        }
584
    }
585
586
    /**
587
     * Fetches a channel object identified by the numeric channel_id, or
588
     * create that object if it doesn't already exist.
589
     *
590
     * @param string $channel_id
591
     * @return AMQPChannel
592
     */
593 60
    public function channel($channel_id = null)
594
    {
595 60
        if (isset($this->channels[$channel_id])) {
596
597
            return $this->channels[$channel_id];
598
        }
599
600 60
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
601 60
        $ch = new AMQPChannel($this->connection, $channel_id);
602 60
        $this->channels[$channel_id] = $ch;
603
604 60
        return $ch;
605
    }
606
607
    /**
608
     * Requests a connection close
609
     *
610
     * @param int $reply_code
611
     * @param string $reply_text
612
     * @param array $method_sig
613
     * @return mixed|null
614
     */
615 60
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
616
    {
617 60
        if (!$this->protocolWriter || !$this->isConnected()) {
618
            return null;
619
        }
620
621 60
        $this->closeChannels();
622
623 60
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
624 48
            $reply_code,
625 48
            $reply_text,
626 60
            $method_sig[0],
627 60
            $method_sig[1]
628 48
        );
629 60
        $this->send_method_frame(array($class_id, $method_id), $args);
630
631 60
        $this->setIsConnected(false);
632
633 60
        return $this->wait(array(
634 60
            $this->waitHelper->get_wait('connection.close_ok')
635 48
        ));
636
    }
637
638
    /**
639
     * @param AMQPReader $args
640
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
641
     */
642
    protected function connection_close($args)
643
    {
644
        $reply_code = $args->read_short();
645
        $reply_text = $args->read_shortstr();
646
        $class_id = $args->read_short();
647
        $method_id = $args->read_short();
648
649
        $this->x_close_ok();
650
651
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
0 ignored issues
show
Documentation introduced by
array($class_id, $method_id) is of type array<integer,*,{"0":"*","1":"*"}>, but the function expects a object<Exception>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
652
    }
653
654
    /**
655
     * Confirms a connection close
656
     */
657
    protected function x_close_ok()
658
    {
659
        $this->send_method_frame(
660
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
661
        );
662
        $this->do_close();
663
    }
664
665
    /**
666
     * Confirm a connection close
667
     */
668 60
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
669
    {
670 60
        $this->do_close();
671 60
    }
672
673
    /**
674
     * @param string $virtual_host
675
     * @param string $capabilities
676
     * @param bool $insist
677
     * @return mixed
678
     */
679 60
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
680
    {
681 60
        $args = new AMQPWriter();
682 60
        $args->write_shortstr($virtual_host);
683 60
        $args->write_shortstr($capabilities);
684 60
        $args->write_bits(array($insist));
685 60
        $this->send_method_frame(array(10, 40), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
686
687
        $wait = array(
688 60
            $this->waitHelper->get_wait('connection.open_ok')
689 48
        );
690
691 60
        if ($this->protocolVersion == '0.8') {
692
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
693
        }
694
695 60
        return $this->wait($wait);
696
    }
697
698
    /**
699
     * Signals that the connection is ready
700
     *
701
     * @param AMQPReader $args
702
     */
703 60
    protected function connection_open_ok($args)
704
    {
705 60
        $this->known_hosts = $args->read_shortstr();
706 60
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
707 60
    }
708
709
    /**
710
     * Asks the client to use a different server
711
     *
712
     * @param AMQPReader $args
713
     * @return string
714
     */
715
    protected function connection_redirect($args)
716
    {
717
        $host = $args->read_shortstr();
718
        $this->known_hosts = $args->read_shortstr();
719
        $this->debug->debug_msg(sprintf(
720
                'Redirected to [%s], known_hosts [%s]',
721
                $host,
722
                $this->known_hosts
723
            ));
724
725
        return $host;
726
    }
727
728
    /**
729
     * Security mechanism challenge
730
     *
731
     * @param AMQPReader $args
732
     */
733
    protected function connection_secure($args)
734
    {
735
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
736
    }
737
738
    /**
739
     * Security mechanism response
740
     */
741
    protected function x_secure_ok($response)
742
    {
743
        $args = new AMQPWriter();
744
        $args->write_longstr($response);
745
        $this->send_method_frame(array(10, 21), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
746
    }
747
748
    /**
749
     * Starts connection negotiation
750
     *
751
     * @param AMQPReader $args
752
     */
753 60
    protected function connection_start($args)
754
    {
755 60
        $this->version_major = $args->read_octet();
756 60
        $this->version_minor = $args->read_octet();
757 60
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
758 60
        $this->mechanisms = explode(' ', $args->read_longstr());
759 60
        $this->locales = explode(' ', $args->read_longstr());
760
761 60
        $this->debug->debug_connection_start(
762 60
            $this->version_major,
763 60
            $this->version_minor,
764 60
            $this->server_properties,
765 60
            $this->mechanisms,
766 60
            $this->locales
767 48
        );
768 60
    }
769
770
    /**
771
     * @param $client_properties
772
     * @param $mechanism
773
     * @param $response
774
     * @param $locale
775
     */
776 60
    protected function x_start_ok($client_properties, $mechanism, $response, $locale)
777
    {
778 60
        $args = new AMQPWriter();
779 60
        $args->write_table($client_properties);
780 60
        $args->write_shortstr($mechanism);
781 60
        $args->write_longstr($response);
782 60
        $args->write_shortstr($locale);
783 60
        $this->send_method_frame(array(10, 11), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
784 60
    }
785
786
    /**
787
     * Proposes connection tuning parameters
788
     *
789
     * @param AMQPReader $args
790
     */
791 60
    protected function connection_tune($args)
792
    {
793 60
        $v = $args->read_short();
794 60
        if ($v) {
795
            $this->channel_max = $v;
796
        }
797
798 60
        $v = $args->read_long();
799 60
        if ($v) {
800 60
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
801 48
        }
802
803
        // use server proposed value if not set
804 60
        if ($this->heartbeat === null) {
805
            $this->heartbeat = $args->read_short();
806
        }
807
808 60
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
809 60
    }
810
811
    /**
812
     * Negotiates connection tuning parameters
813
     *
814
     * @param $channel_max
815
     * @param $frame_max
816
     * @param $heartbeat
817
     */
818 60
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
819
    {
820 60
        $args = new AMQPWriter();
821 60
        $args->write_short($channel_max);
822 60
        $args->write_long($frame_max);
823 60
        $args->write_short($heartbeat);
824 60
        $this->send_method_frame(array(10, 31), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
825 60
        $this->wait_tune_ok = false;
826 60
    }
827
828
    /**
829
     * @return SocketIO
830
     */
831
    public function getSocket()
832
    {
833
        return $this->io->getSocket();
834
    }
835
836
    /**
837
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
838
     */
839 50
    protected function getIO()
840
    {
841 50
        return $this->io;
842
    }
843
844
    /**
845
     * Handles connection blocked notifications
846
     *
847
     * @param AMQPReader $args
848
     */
849
    protected function connection_blocked(AMQPReader $args)
850
    {
851
        // Call the block handler and pass in the reason
852
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
853
    }
854
855
    /**
856
     * Handles connection unblocked notifications
857
     */
858
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
859
    {
860
        // No args to an unblock event
861
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
862
    }
863
864
    /**
865
     * Sets a handler which is called whenever a connection.block is sent from the server
866
     *
867
     * @param callable $callback
868
     */
869
    public function set_connection_block_handler($callback)
870
    {
871
        $this->connection_block_handler = $callback;
872
    }
873
874
    /**
875
     * Sets a handler which is called whenever a connection.block is sent from the server
876
     *
877
     * @param callable $callback
878
     */
879
    public function set_connection_unblock_handler($callback)
880
    {
881
        $this->connection_unblock_handler = $callback;
882
    }
883
884
    /**
885
     * Gets the connection status
886
     *
887
     * @return bool
888
     */
889 60
    public function isConnected()
890
    {
891 60
        return $this->is_connected;
892
    }
893
894
    /**
895
     * Set the connection status
896
     *
897
     * @param bool $is_connected
898
     */
899 60
    protected function setIsConnected($is_connected)
900
    {
901 60
        $this->is_connected = (bool) $is_connected;
902 60
    }
903
904
    /**
905
     * Closes all available channels
906
     */
907 60
    protected function closeChannels()
908
    {
909 60
        foreach ($this->channels as $key => $channel) {
910
            // channels[0] is this connection object, so don't close it yet
911 60
            if ($key === 0) {
912 60
                continue;
913
            }
914
            try {
915 25
                $channel->close();
916 25
            } catch (\Exception $e) {
917
                /* Ignore closing errors */
918
            }
919 48
        }
920 60
    }
921
922
    /**
923
     * Should the connection be attempted during construction?
924
     *
925
     * @return bool
926
     */
927 50
    public function connectOnConstruct()
928
    {
929 50
        return true;
930
    }
931
}
932