Completed
Push — master ( 2acff7...79c6d0 )
by John
20:11 queued 12s
created

AbstractConnection::close()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 27
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 4.0378

Importance

Changes 4
Bugs 1 Features 0
Metric Value
c 4
b 1
f 0
dl 0
loc 27
ccs 13
cts 15
cp 0.8667
rs 8.5806
cc 4
eloc 15
nc 4
nop 3
crap 4.0378
1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AbstractChannel;
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPWriter;
11
use PhpAmqpLib\Wire\IO\AbstractIO;
12
use PhpAmqpLib\Wire\IO\SocketIO;
13
use PhpAmqpLib\Wire\IO\StreamIO;
14
15
class AbstractConnection extends AbstractChannel
16
{
17
    /** @var array */
18
    public static $LIBRARY_PROPERTIES = array(
19
        'product' => array('S', 'AMQPLib'),
20
        'platform' => array('S', 'PHP'),
21
        'version' => array('S', '2.4'),
22
        'information' => array('S', ''),
23
        'copyright' => array('S', ''),
24
        'capabilities' => array(
25
            'F',
26
            array(
27
                'authentication_failure_close' => array('t', true),
28
                'publisher_confirms' => array('t', true),
29
                'consumer_cancel_notify' => array('t', true),
30
                'exchange_exchange_bindings' => array('t', true),
31
                'basic.nack' => array('t', true),
32
                'connection.blocked' => array('t', true)
33
            )
34
        )
35
    );
36
37
    /** @var AMQPChannel[] */
38
    public $channels = array();
39
40
    /** @var int */
41
    protected $version_major;
42
43
    /** @var int */
44
    protected $version_minor;
45
46
    /** @var array */
47
    protected $server_properties;
48
49
    /** @var array */
50
    protected $mechanisms;
51
52
    /** @var array */
53
    protected $locales;
54
55
    /** @var bool */
56
    protected $wait_tune_ok;
57
58
    /** @var string */
59
    protected $known_hosts;
60
61
    /** @var AMQPReader */
62
    protected $input;
63
64
    /** @var string */
65
    protected $vhost;
66
67
    /** @var bool */
68
    protected $insist;
69
70
    /** @var string */
71
    protected $login_method;
72
73
    /** @var AMQPWriter */
74
    protected $login_response;
75
76
    /** @var string */
77
    protected $locale;
78
79
    /** @var int */
80
    protected $heartbeat;
81
82
    /** @var SocketIO */
83
    protected $sock;
84
85
    /** @var int */
86
    protected $channel_max = 65535;
87
88
    /** @var int */
89
    protected $frame_max = 131072;
90
91
     /** @var array Constructor parameters for clone */
92
    protected $construct_params;
93
94
    /** @var bool Close the connection in destructor */
95
    protected $close_on_destruct = true;
96
97
    /** @var bool Maintain connection status */
98
    protected $is_connected = false;
99
100
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
101
    protected $io;
102
103
    /** @var \PhpAmqpLib\Wire\AMQPReader */
104
    protected $wait_frame_reader;
105
106
    /** @var callable Handles connection blocking from the server */
107
    private $connection_block_handler;
108
109
    /** @var callable Handles connection unblocking from the server */
110
    private $connection_unblock_handler;
111
112
    /**
113
     * Circular buffer to speed up prepare_content().
114
     * Max size limited by $prepare_content_cache_max_size.
115
     *
116
     * @var array
117
     * @see prepare_content()
118
     */
119
    private $prepare_content_cache;
120
121
    /** @var int Maximal size of $prepare_content_cache */
122
    private $prepare_content_cache_max_size;
123
124
    /**
125
     * @param string $user
126
     * @param string $password
127
     * @param string $vhost
128
     * @param bool $insist
129
     * @param string $login_method
130
     * @param null $login_response
131
     * @param string $locale
132
     * @param AbstractIO $io
133
     * @param int $heartbeat
134
     * @throws \Exception
135 72
     */
136
    public function __construct(
137
        $user,
138
        $password,
139
        $vhost = '/',
140
        $insist = false,
141
        $login_method = 'AMQPLAIN',
142
        $login_response = null,
143
        $locale = 'en_US',
144
        AbstractIO $io,
145
        $heartbeat = 0
146
    ) {
147 72
        // save the params for the use of __clone
148
        $this->construct_params = func_get_args();
149 72
150 72
        $this->wait_frame_reader = new AMQPReader(null);
151 72
        $this->vhost = $vhost;
152 72
        $this->insist = $insist;
153 72
        $this->login_method = $login_method;
154 72
        $this->login_response = $login_response;
155 72
        $this->locale = $locale;
156 72
        $this->io = $io;
157
        $this->heartbeat = $heartbeat;
158 72
159 72
        if ($user && $password) {
160 72
            $this->login_response = new AMQPWriter();
161 72
            $this->login_response->write_table(array(
162 72
                'LOGIN' => array('S', $user),
163 48
                'PASSWORD' => array('S', $password)
164
            ));
165
166 72
            // Skip the length
167 72
            $responseValue = $this->login_response->getvalue();
168
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
0 ignored issues
show
Documentation Bug introduced by
It seems like mb_substr($responseValue... 'ASCII') - 4, 'ASCII') of type string is incompatible with the declared type object<PhpAmqpLib\Wire\AMQPWriter> of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
169 48
170
        } else {
171
            $this->login_response = null;
172
        }
173 72
174 72
        $this->prepare_content_cache = array();
175
        $this->prepare_content_cache_max_size = 100;
176
177 72
        // Lazy Connection waits on connecting
178 60
        if ($this->connectOnConstruct()) {
179 40
            $this->connect();
180 72
        }
181
    }
182
183
    /**
184
     * Connects to the AMQP server
185 72
     */
186
    protected function connect()
187
    {
188
        try {
189 72
            // Loop until we connect
190
            while (!$this->isConnected()) {
191 72
                // Assume we will connect, until we dont
192
                $this->setIsConnected(true);
193
194 72
                // Connect the socket
195
                $this->getIO()->connect();
196 72
197
                $this->channels = array();
198 72
                // The connection object itself is treated as channel 0
199
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
200 72
201
                $this->input = new AMQPReader(null, $this->getIO());
202 72
203 72
                $this->write($this->amqp_protocol_header);
204 72
                $this->wait(array($this->waitHelper->get_wait('connection.start')));
205
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
206 72
207 72
                $this->wait_tune_ok = true;
208 72
                while ($this->wait_tune_ok) {
209 72
                    $this->wait(array(
210 72
                        $this->waitHelper->get_wait('connection.secure'),
211 48
                        $this->waitHelper->get_wait('connection.tune')
212 48
                    ));
213
                }
214 72
215 72
                $host = $this->x_open($this->vhost, '', $this->insist);
216 72
                if (!$host) {
217
                    return null; // we weren't redirected
218
                }
219
220
                $this->setIsConnected(false);
221
                $this->closeChannels();
222
223
                // we were redirected, close the socket, loop and try again
224
                $this->close_socket();
225
            }
226 8
227
        } catch (\Exception $e) {
228
            // Something went wrong, set the connection status
229
            $this->setIsConnected(false);
230
            $this->closeChannels();
231
            throw $e; // Rethrow exception
232 12
        }
233
    }
234
235
    /**
236
     * Reconnects using the original connection settings.
237
     * This will not recreate any channels that were established previously
238 24
     */
239
    public function reconnect()
240
    {
241 24
        // Try to close the AMQP connection
242
        $this->safeClose();
243 24
        // Reconnect the socket/stream then AMQP
244 24
        $this->getIO()->reconnect();
245 24
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
246 24
        $this->connect();
247
    }
248
249
    /**
250
     * Cloning will use the old properties to make a new connection to the same server
251
     */
252
    public function __clone()
253
    {
254
        call_user_func_array(array($this, '__construct'), $this->construct_params);
255
    }
256
257
    public function __destruct()
258
    {
259
        if ($this->close_on_destruct) {
260
            $this->safeClose();
261
        }
262
    }
263
264
    /**
265
     * Attempts to close the connection safely
266 24
     */
267
    protected function safeClose()
268
    {
269 24
        try {
270 16
            if (isset($this->input) && $this->input) {
271 8
                $this->close();
272 16
            }
273
        } catch (\Exception $e) {
274
            // Nothing here
275 24
        }
276
    }
277
278
    /**
279
     * @param int $sec
280
     * @param int $usec
281
     * @return mixed
282
     */
283
    public function select($sec, $usec = 0)
284
    {
285
        return $this->getIO()->select($sec, $usec);
286
    }
287
288
    /**
289
     * Allows to not close the connection
290
     * it's useful after the fork when you don't want to close parent process connection
291
     *
292
     * @param bool $close
293
     */
294
    public function set_close_on_destruct($close = true)
295
    {
296
        $this->close_on_destruct = (bool) $close;
297
    }
298 72
299
    protected function close_input()
300 72
    {
301
        $this->debug->debug_msg('closing input');
302 72
303 72
        if (!is_null($this->input)) {
304 72
            $this->input->close();
305 48
            $this->input = null;
306 72
        }
307
    }
308 72
309
    protected function close_socket()
310 72
    {
311
        $this->debug->debug_msg('closing socket');
312 72
313 72
        if (!is_null($this->getIO())) {
314 48
            $this->getIO()->close();
315 72
        }
316
    }
317
318
    /**
319
     * @param $data
320 72
     */
321
    public function write($data)
322 72
    {
323
        $this->debug->debug_hexdump($data);
324
325 72
        try {
326 48
            $this->getIO()->write($data);
327
        } catch (AMQPRuntimeException $e) {
328
            $this->setIsConnected(false);
329
            throw $e;
330 72
        }
331
    }
332 72
333
    protected function do_close()
334 72
    {
335 72
        $this->setIsConnected(false);
336 72
        $this->close_input();
337 72
        $this->close_socket();
338
    }
339
340
    /**
341
     * @return int
342
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
343 72
     */
344
    public function get_free_channel_id()
345 72
    {
346 72
        for ($i = 1; $i <= $this->channel_max; $i++) {
347 72
            if (!isset($this->channels[$i])) {
348
                return $i;
349 12
            }
350
        }
351
352
        throw new AMQPRuntimeException('No free channel ids');
353
    }
354
355
    /**
356
     * @param string $channel
357
     * @param int $class_id
358
     * @param int $weight
359
     * @param int $body_size
360
     * @param string $packed_properties
361
     * @param string $body
362
     * @param AMQPWriter $pkt
363 66
     */
364
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
365 66
    {
366 66
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
367 66
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
368
    }
369
370
    /**
371
     * Returns a new AMQPWriter or mutates the provided $pkt
372
     *
373
     * @param string $channel
374
     * @param int $class_id
375
     * @param int $weight
376
     * @param int $body_size
377
     * @param string $packed_properties
378
     * @param string $body
379
     * @param AMQPWriter $pkt
380
     * @return AMQPWriter
381 66
     */
382
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
383 66
    {
384
        $pkt = $pkt ?: new AMQPWriter();
385
386 66
        // Content already prepared ?
387 66
        $key_cache = sprintf(
388 44
            '%s|%s|%s|%s',
389 44
            $channel,
390 44
            $packed_properties,
391
            $class_id,
392 44
            $weight
393
        );
394 66
395 66
        if (!isset($this->prepare_content_cache[$key_cache])) {
396 66
            $w = new AMQPWriter();
397 66
            $w->write_octet(2);
398 66
            $w->write_short($channel);
399 66
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
400 66
            $w->write_short($class_id);
401 66
            $w->write_short($weight);
402 66
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
403
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
404
                reset($this->prepare_content_cache);
405
                $old_key = key($this->prepare_content_cache);
406
                unset($this->prepare_content_cache[$old_key]);
407 44
            }
408 66
        }
409
        $pkt->write($this->prepare_content_cache[$key_cache]);
410 66
411 66
        $pkt->write_longlong($body_size);
412
        $pkt->write($packed_properties);
413 66
414
        $pkt->write_octet(0xCE);
415
416
417
        // memory efficiency: walk the string instead of biting
418
        // it. good for very large packets (close in size to
419 66
        // memory_limit setting)
420 66
        $position = 0;
421 66
        $bodyLength = mb_strlen($body,'ASCII');
422 60
        while ($position < $bodyLength) {
423 60
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
424
            $position += $this->frame_max - 8;
425 60
426 60
            $pkt->write_octet(3);
427 60
            $pkt->write_short($channel);
428
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
429 60
430
            $pkt->write($payload);
431 60
432 40
            $pkt->write_octet(0xCE);
433
        }
434 66
435
        return $pkt;
436
    }
437
438
    /**
439
     * @param $channel
440
     * @param $method_sig
441
     * @param string $args
442
     * @param null $pkt
443 72
     */
444
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
445 72
    {
446 72
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
447 72
        $this->write($pkt->getvalue());
448 72
        $this->debug->debug_method_signature1($method_sig);
449
    }
450
451
    /**
452
     * Returns a new AMQPWriter or mutates the provided $pkt
453
     *
454
     * @param $channel
455
     * @param $method_sig
456
     * @param string $args
457
     * @param AMQPWriter $pkt
458
     * @return null|AMQPWriter
459 72
     */
460
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
461 72
    {
462 72
        if ($args instanceof AMQPWriter) {
463 48
            $args = $args->getvalue();
464
        }
465 72
466
        $pkt = $pkt ?: new AMQPWriter();
467 72
468 72
        $pkt->write_octet(1);
469 72
        $pkt->write_short($channel);
470
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
471
        // in payload
472 72
473 72
        $pkt->write_short($method_sig[0]); // class_id
474 72
        $pkt->write_short($method_sig[1]); // method_id
475
        $pkt->write($args);
476 72
477
        $pkt->write_octet(0xCE);
478 72
479
        $this->debug->debug_method_signature1($method_sig);
480 72
481
        return $pkt;
482
    }
483
484
    /**
485
     * Waits for a frame from the server
486
     *
487
     * @param int $timeout
488
     * @return array
489
     * @throws \Exception
490
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
491
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
492 72
     */
493
    protected function wait_frame($timeout = 0)
494 72
    {
495 48
        if (is_null($this->input))
496
        {
497
            $this->setIsConnected(false);
498
            throw new AMQPRuntimeException('Broken pipe or closed connection');
499
        }
500 72
501 72
        $currentTimeout = $this->input->getTimeout();
502
        $this->input->setTimeout($timeout);
503
504
        try {
505 72
            // frame_type + channel_id + size
506 72
            $this->wait_frame_reader->reuse(
507 48
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
508
            );
509 72
510 72
            $frame_type = $this->wait_frame_reader->read_octet();
511 72
            $channel = $this->wait_frame_reader->read_short();
512
            $size = $this->wait_frame_reader->read_long();
513
514 72
            // payload + ch
515
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
516 72
517 72
            $payload = $this->wait_frame_reader->read($size);
518
            $ch = $this->wait_frame_reader->read_octet();
519 48
520
        } catch (AMQPTimeoutException $e) {
521
            $this->input->setTimeout($currentTimeout);
522
            throw $e;
523
        }
524 72
525
        $this->input->setTimeout($currentTimeout);
526 72
527
        if ($ch != 0xCE) {
528
            throw new AMQPRuntimeException(sprintf(
529
                'Framing error, unexpected byte: %x',
530
                $ch
531
            ));
532
        }
533 72
534
        return array($frame_type, $channel, $payload);
535
    }
536
537
    /**
538
     * Waits for a frame from the server destined for a particular channel.
539
     *
540
     * @param string $channel_id
541
     * @param int $timeout
542
     * @return array
543 72
     */
544
    protected function wait_channel($channel_id, $timeout = 0)
545
    {
546 72
        // Keeping the original timeout unchanged.
547 72
        $_timeout = $timeout;
548 72
        while (true) {
549 72
            $now = time();
550
            list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
551 72
552
            if ($frame_channel === 0 && $frame_type === 8) {
553
                // skip heartbeat frames and reduce the timeout by the time passed
554
                if($_timeout > 0) {
555
                    $_timeout -= time() - $now;
556
                    if($_timeout <= 0) {
557
                        // If timeout has been reached, throw the exception without calling wait_frame
558
                        throw new AMQPTimeoutException("Timeout waiting on channel");
559
                    }
560
                }
561
                continue;
562
563
            } else {
564 72
565 72
                if ($frame_channel == $channel_id) {
566
                    return array($frame_type, $payload);
567
                }
568
569
                // Not the channel we were looking for.  Queue this frame
570
                //for later, when the other channel is looking for frames.
571
                // Make sure the channel still exists, it could have been
572 6
                // closed by a previous Exception.
573 6
                if (isset($this->channels[$frame_channel])) {
574 4
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
575
                }
576
577
                // If we just queued up a method for channel 0 (the Connection
578
                // itself) it's probably a close method in reaction to some
579 6
                // error, so deal with it right away.
580
                if (($frame_type == 1) && ($frame_channel == 0)) {
581
                    $this->wait();
582
                }
583 4
            }
584
        }
585
    }
586
587
    /**
588
     * Fetches a channel object identified by the numeric channel_id, or
589
     * create that object if it doesn't already exist.
590
     *
591
     * @param string $channel_id
592
     * @return AMQPChannel
593 72
     */
594
    public function channel($channel_id = null)
595 72
    {
596
        if (isset($this->channels[$channel_id])) {
597
598
            return $this->channels[$channel_id];
599
        }
600 72
601 72
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
602 72
        $ch = new AMQPChannel($this->connection, $channel_id);
603
        $this->channels[$channel_id] = $ch;
604 72
605
        return $ch;
606
    }
607
608
    /**
609
     * Requests a connection close
610
     *
611
     * @param int $reply_code
612
     * @param string $reply_text
613
     * @param array $method_sig
614
     * @return mixed|null
615 72
     */
616
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
617 72
    {
618
        if ($this->io instanceof StreamIO)
619
        {
620
            $this->io->disableHeartbeat();
621 72
        }
622
623 72
        if (!$this->protocolWriter || !$this->isConnected()) {
624 48
            return null;
625 48
        }
626 72
627 72
        $this->closeChannels();
628 48
629 72
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
630
            $reply_code,
631 72
            $reply_text,
632
            $method_sig[0],
633 72
            $method_sig[1]
634 72
        );
635 48
        $this->send_method_frame(array($class_id, $method_id), $args);
636
637
        $this->setIsConnected(false);
638
639
        return $this->wait(array(
640
            $this->waitHelper->get_wait('connection.close_ok')
641
        ));
642
    }
643
644
    /**
645
     * @param AMQPReader $args
646
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
647
     */
648
    protected function connection_close($args)
649
    {
650
        $reply_code = $args->read_short();
651
        $reply_text = $args->read_shortstr();
652
        $class_id = $args->read_short();
653
        $method_id = $args->read_short();
654
655
        $this->x_close_ok();
656
657
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
0 ignored issues
show
Documentation introduced by
array($class_id, $method_id) is of type array<integer,*,{"0":"*","1":"*"}>, but the function expects a object<Exception>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
658
    }
659
660
    /**
661
     * Confirms a connection close
662
     */
663
    protected function x_close_ok()
664
    {
665
        $this->send_method_frame(
666
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
667
        );
668 72
        $this->do_close();
669
    }
670 72
671 72
    /**
672
     * Confirm a connection close
673
     */
674
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
675
    {
676
        $this->do_close();
677
    }
678
679 72
    /**
680
     * @param string $virtual_host
681 72
     * @param string $capabilities
682 72
     * @param bool $insist
683 72
     * @return mixed
684 72
     */
685 72
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
686
    {
687
        $args = new AMQPWriter();
688 72
        $args->write_shortstr($virtual_host);
689 48
        $args->write_shortstr($capabilities);
690
        $args->write_bits(array($insist));
691 72
        $this->send_method_frame(array(10, 40), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
692
693
        $wait = array(
694
            $this->waitHelper->get_wait('connection.open_ok')
695 72
        );
696
697
        if ($this->protocolVersion == '0.8') {
698
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
699
        }
700
701
        return $this->wait($wait);
702
    }
703 72
704
    /**
705 72
     * Signals that the connection is ready
706 72
     *
707 72
     * @param AMQPReader $args
708
     */
709
    protected function connection_open_ok($args)
710
    {
711
        $this->known_hosts = $args->read_shortstr();
712
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
713
    }
714
715
    /**
716
     * Asks the client to use a different server
717
     *
718
     * @param AMQPReader $args
719
     * @return string
720
     */
721
    protected function connection_redirect($args)
722
    {
723
        $host = $args->read_shortstr();
724
        $this->known_hosts = $args->read_shortstr();
725
        $this->debug->debug_msg(sprintf(
726
                'Redirected to [%s], known_hosts [%s]',
727
                $host,
728
                $this->known_hosts
729
            ));
730
731
        return $host;
732
    }
733
734
    /**
735
     * Security mechanism challenge
736
     *
737
     * @param AMQPReader $args
738
     */
739
    protected function connection_secure($args)
740
    {
741
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
742
    }
743
744
    /**
745
     * Security mechanism response
746
     */
747
    protected function x_secure_ok($response)
748
    {
749
        $args = new AMQPWriter();
750
        $args->write_longstr($response);
751
        $this->send_method_frame(array(10, 21), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
752
    }
753 72
754
    /**
755 72
     * Starts connection negotiation
756 72
     *
757 72
     * @param AMQPReader $args
758 72
     */
759 72
    protected function connection_start($args)
760
    {
761 72
        $this->version_major = $args->read_octet();
762 72
        $this->version_minor = $args->read_octet();
763 72
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
764 72
        $this->mechanisms = explode(' ', $args->read_longstr());
765 72
        $this->locales = explode(' ', $args->read_longstr());
766 72
767 48
        $this->debug->debug_connection_start(
768 72
            $this->version_major,
769
            $this->version_minor,
770
            $this->server_properties,
771
            $this->mechanisms,
772
            $this->locales
773
        );
774
    }
775
776 72
    /**
777
     * @param $client_properties
778 72
     * @param $mechanism
779 72
     * @param $response
780 72
     * @param $locale
781 72
     */
782 72
    protected function x_start_ok($client_properties, $mechanism, $response, $locale)
783 72
    {
784 72
        $args = new AMQPWriter();
785
        $args->write_table($client_properties);
786
        $args->write_shortstr($mechanism);
787
        $args->write_longstr($response);
788
        $args->write_shortstr($locale);
789
        $this->send_method_frame(array(10, 11), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
790
    }
791 72
792
    /**
793 72
     * Proposes connection tuning parameters
794 72
     *
795
     * @param AMQPReader $args
796
     */
797
    protected function connection_tune($args)
798 72
    {
799 72
        $v = $args->read_short();
800 72
        if ($v) {
801 48
            $this->channel_max = $v;
802
        }
803
804 72
        $v = $args->read_long();
805
        if ($v) {
806
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
807
        }
808 72
809 72
        // use server proposed value if not set
810
        if ($this->heartbeat === null) {
811
            $this->heartbeat = $args->read_short();
812
        }
813
814
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
815
    }
816
817
    /**
818 72
     * Negotiates connection tuning parameters
819
     *
820 72
     * @param $channel_max
821 72
     * @param $frame_max
822 72
     * @param $heartbeat
823 72
     */
824 72
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
825 72
    {
826 72
        $args = new AMQPWriter();
827
        $args->write_short($channel_max);
828
        $args->write_long($frame_max);
829
        $args->write_short($heartbeat);
830
        $this->send_method_frame(array(10, 31), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
831
        $this->wait_tune_ok = false;
832
    }
833
834
    /**
835
     * @return SocketIO
836
     */
837
    public function getSocket()
838
    {
839 60
        return $this->io->getSocket();
840
    }
841 60
842
    /**
843
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
844
     */
845
    protected function getIO()
846
    {
847
        return $this->io;
848
    }
849
850
    /**
851
     * Handles connection blocked notifications
852
     *
853
     * @param AMQPReader $args
854
     */
855
    protected function connection_blocked(AMQPReader $args)
856
    {
857
        // Call the block handler and pass in the reason
858
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
859
    }
860
861
    /**
862
     * Handles connection unblocked notifications
863
     */
864
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
865
    {
866
        // No args to an unblock event
867
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
868
    }
869
870
    /**
871
     * Sets a handler which is called whenever a connection.block is sent from the server
872
     *
873
     * @param callable $callback
874
     */
875
    public function set_connection_block_handler($callback)
876
    {
877
        $this->connection_block_handler = $callback;
878
    }
879
880
    /**
881
     * Sets a handler which is called whenever a connection.block is sent from the server
882
     *
883
     * @param callable $callback
884
     */
885
    public function set_connection_unblock_handler($callback)
886
    {
887
        $this->connection_unblock_handler = $callback;
888
    }
889 72
890
    /**
891 72
     * Gets the connection status
892
     *
893
     * @return bool
894
     */
895
    public function isConnected()
896
    {
897
        return $this->is_connected;
898
    }
899 72
900
    /**
901 72
     * Set the connection status
902 72
     *
903
     * @param bool $is_connected
904
     */
905
    protected function setIsConnected($is_connected)
906
    {
907 72
        $this->is_connected = (bool) $is_connected;
908
    }
909 72
910
    /**
911 72
     * Closes all available channels
912 72
     */
913
    protected function closeChannels()
914
    {
915 30
        foreach ($this->channels as $key => $channel) {
916 30
            // channels[0] is this connection object, so don't close it yet
917
            if ($key === 0) {
918
                continue;
919 48
            }
920 72
            try {
921
                $channel->close();
922
            } catch (\Exception $e) {
923
                /* Ignore closing errors */
924
            }
925
        }
926
    }
927 60
928
    /**
929 60
     * Should the connection be attempted during construction?
930
     *
931
     * @return bool
932 4
     */
933
    public function connectOnConstruct()
934
    {
935
        return true;
936
    }
937
}
938