Completed
Push — master ( 1dcdce...982923 )
by
unknown
07:40 queued 19s
created

AbstractConnection::checkHeartBeat()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AbstractChannel;
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
7
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
8
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
9
use PhpAmqpLib\Exception\AMQPIOException;
10
use PhpAmqpLib\Exception\AMQPNoDataException;
11
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
12
use PhpAmqpLib\Exception\AMQPRuntimeException;
13
use PhpAmqpLib\Exception\AMQPSocketException;
14
use PhpAmqpLib\Exception\AMQPTimeoutException;
15
use PhpAmqpLib\Wire\AMQPReader;
16
use PhpAmqpLib\Wire\AMQPTable;
17
use PhpAmqpLib\Wire\AMQPWriter;
18
use PhpAmqpLib\Wire\IO\AbstractIO;
19
20
class AbstractConnection extends AbstractChannel
21
{
22
    /**
23
     * @var array
24
     * @internal
25
     */
26
    public static $LIBRARY_PROPERTIES = array(
27
        'product' => array('S', 'AMQPLib'),
28
        'platform' => array('S', 'PHP'),
29
        'version' => array('S', '2.9'),
30
        'information' => array('S', ''),
31
        'copyright' => array('S', ''),
32
        'capabilities' => array(
33
            'F',
34
            array(
35
                'authentication_failure_close' => array('t', true),
36
                'publisher_confirms' => array('t', true),
37
                'consumer_cancel_notify' => array('t', true),
38
                'exchange_exchange_bindings' => array('t', true),
39
                'basic.nack' => array('t', true),
40
                'connection.blocked' => array('t', true)
41
            )
42
        )
43
    );
44
45
    /**
46
     * @var AMQPChannel[]
47
     * @internal
48
     */
49
    public $channels = array();
50
51
    /** @var int */
52
    protected $version_major;
53
54
    /** @var int */
55
    protected $version_minor;
56
57
    /** @var array */
58
    protected $server_properties;
59
60
    /** @var array */
61
    protected $mechanisms;
62
63
    /** @var array */
64
    protected $locales;
65
66
    /** @var bool */
67
    protected $wait_tune_ok;
68
69
    /** @var string */
70
    protected $known_hosts;
71
72
    /** @var AMQPReader */
73
    protected $input;
74
75
    /** @var string */
76
    protected $vhost;
77
78
    /** @var bool */
79
    protected $insist;
80
81
    /** @var string */
82
    protected $login_method;
83
84
    /** @var string */
85
    protected $login_response;
86
87
    /** @var string */
88
    protected $locale;
89
90
    /** @var int */
91
    protected $heartbeat;
92
93
    /** @var float */
94
    protected $last_frame;
95
96
    /** @var int */
97
    protected $channel_max = 65535;
98
99
    /** @var int */
100
    protected $frame_max = 131072;
101
102
     /** @var array Constructor parameters for clone */
103
    protected $construct_params;
104
105
    /** @var bool Close the connection in destructor */
106
    protected $close_on_destruct = true;
107
108
    /** @var bool Maintain connection status */
109
    protected $is_connected = false;
110
111
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
112
    protected $io;
113
114
    /** @var \PhpAmqpLib\Wire\AMQPReader */
115
    protected $wait_frame_reader;
116
117
    /** @var callable Handles connection blocking from the server */
118
    private $connection_block_handler;
119
120
    /** @var callable Handles connection unblocking from the server */
121
    private $connection_unblock_handler;
122
123
    /** @var int Connection timeout value*/
124
    protected $connection_timeout ;
125
126
    /**
127
     * Circular buffer to speed up prepare_content().
128
     * Max size limited by $prepare_content_cache_max_size.
129
     *
130
     * @var array
131
     * @see prepare_content()
132
     */
133
    private $prepare_content_cache = array();
134
135
    /** @var int Maximal size of $prepare_content_cache */
136
    private $prepare_content_cache_max_size = 100;
137
138
    /**
139
     * Maximum time to wait for channel operations, in seconds
140
     * @var float $channel_rpc_timeout
141
     */
142
    private $channel_rpc_timeout;
143
144
    /**
145
     * @param string $user
146
     * @param string $password
147
     * @param string $vhost
148
     * @param bool $insist
149
     * @param string $login_method
150
     * @param null $login_response
151
     * @param string $locale
152
     * @param AbstractIO $io
153
     * @param int $heartbeat
154
     * @param int $connection_timeout
155
     * @param float $channel_rpc_timeout
156
     * @throws \Exception
157
     */
158 186
    public function __construct(
159
        $user,
160
        $password,
161
        $vhost = '/',
162
        $insist = false,
163
        $login_method = 'AMQPLAIN',
164
        $login_response = null,
165
        $locale = 'en_US',
166
        AbstractIO $io,
167
        $heartbeat = 60,
168
        $connection_timeout = 0,
169
        $channel_rpc_timeout = 0.0
170
    ) {
171
        // save the params for the use of __clone
172 186
        $this->construct_params = func_get_args();
173
174 186
        $this->wait_frame_reader = new AMQPReader(null);
175 186
        $this->vhost = $vhost;
176 186
        $this->insist = $insist;
177 186
        $this->login_method = $login_method;
178 186
        $this->login_response = $login_response;
179 186
        $this->locale = $locale;
180 186
        $this->io = $io;
181 186
        $this->heartbeat = $heartbeat;
182 186
        $this->connection_timeout = $connection_timeout;
183 186
        $this->channel_rpc_timeout = $channel_rpc_timeout;
184
185 186
        if ($user && $password) {
186 186
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
187 186
            $this->login_response->write_table(array(
188 186
                'LOGIN' => array('S', $user),
189 186
                'PASSWORD' => array('S', $password)
190 93
            ));
191
192
            // Skip the length
193 186
            $responseValue = $this->login_response->getvalue();
194 186
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
195
196 93
        } else {
197
            $this->login_response = null;
198
        }
199
200
        // Lazy Connection waits on connecting
201 186
        if ($this->connectOnConstruct()) {
202 162
            $this->connect();
203 78
        }
204 180
    }
205
206
    /**
207
     * Connects to the AMQP server
208
     */
209 186
    protected function connect()
210
    {
211
        try {
212
            // Loop until we connect
213 186
            while (!$this->isConnected()) {
214
                // Assume we will connect, until we dont
215 186
                $this->setIsConnected(true);
216
217
                // Connect the socket
218 186
                $this->io->connect();
219
220 180
                $this->channels = array();
221
                // The connection object itself is treated as channel 0
222 180
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
223
224 180
                $this->input = new AMQPReader(null, $this->io);
225
226 180
                $this->write($this->amqp_protocol_header);
227 180
                $this->wait(array($this->waitHelper->get_wait('connection.start')),false,$this->connection_timeout);
228 180
                $this->x_start_ok(
229 180
                    $this->getLibraryProperties(),
230 180
                    $this->login_method,
231 180
                    $this->login_response,
232 180
                    $this->locale
233 90
                );
234
235 180
                $this->wait_tune_ok = true;
236 180
                while ($this->wait_tune_ok) {
237 180
                    $this->wait(array(
238 180
                        $this->waitHelper->get_wait('connection.secure'),
239 180
                        $this->waitHelper->get_wait('connection.tune')
240 90
                    ));
241 90
                }
242
243 180
                $host = $this->x_open($this->vhost, '', $this->insist);
244 180
                if (!$host) {
245
                    //Reconnected
246 180
                    $this->io->reenableHeartbeat();
247 180
                    return null; // we weren't redirected
248
                }
249
250
                $this->setIsConnected(false);
251
                $this->closeChannels();
252
253
                // we were redirected, close the socket, loop and try again
254
                $this->close_socket();
255
            }
256
257 18
        } catch (\Exception $e) {
258
            // Something went wrong, set the connection status
259 6
            $this->setIsConnected(false);
260 6
            $this->closeChannels();
261 6
            $this->close_input();
262 6
            $this->close_socket();
263 6
            throw $e; // Rethrow exception
264
        }
265 24
    }
266
267
    /**
268
     * Reconnects using the original connection settings.
269
     * This will not recreate any channels that were established previously
270
     */
271 36
    public function reconnect()
272
    {
273
        // Try to close the AMQP connection
274 36
        $this->safeClose();
275
        // Reconnect the socket/stream then AMQP
276 36
        $this->io->close();
277 36
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
278 36
        $this->connect();
279 36
    }
280
281
    /**
282
     * Cloning will use the old properties to make a new connection to the same server
283
     */
284
    public function __clone()
285
    {
286
        call_user_func_array(array($this, '__construct'), $this->construct_params);
287
    }
288
289 12
    public function __destruct()
290
    {
291 12
        if ($this->close_on_destruct) {
292 12
            $this->safeClose();
293 6
        }
294 12
    }
295
296
    /**
297
     * Attempts to close the connection safely
298
     */
299 48
    protected function safeClose()
300
    {
301
        try {
302 48
            if (isset($this->input) && $this->input) {
303 33
                $this->close();
304 9
            }
305 24
        } catch (\Exception $e) {
306
            // Nothing here
307
        }
308 48
    }
309
310
    /**
311
     * @param int $sec
312
     * @param int $usec
313
     * @return mixed
314
     */
315 View Code Duplication
    public function select($sec, $usec = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
316
    {
317
        try {
318
            return $this->io->select($sec, $usec);
319
        } catch (AMQPConnectionClosedException $e) {
320
            $this->do_close();
321
            throw $e;
322
        } catch (AMQPRuntimeException $e) {
323
            $this->setIsConnected(false);
324
            throw $e;
325
        }
326
    }
327
328
    /**
329
     * Allows to not close the connection
330
     * it's useful after the fork when you don't want to close parent process connection
331
     *
332
     * @param bool $close
333
     */
334
    public function set_close_on_destruct($close = true)
335
    {
336
        $this->close_on_destruct = (bool) $close;
337
    }
338
339 150
    protected function close_input()
340
    {
341 150
        $this->debug && $this->debug->debug_msg('closing input');
342
343 150
        if (!is_null($this->input)) {
344 144
            $this->input->close();
345 144
            $this->input = null;
346 72
        }
347 150
    }
348
349 150
    protected function close_socket()
350
    {
351 150
        $this->debug && $this->debug->debug_msg('closing socket');
352
353 150
        if ($this->io) {
354 150
            $this->io->close();
355 75
        }
356 150
    }
357
358
    /**
359
     * @param string $data
360
     */
361 180 View Code Duplication
    public function write($data)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
362
    {
363 180
        $this->debug->debug_hexdump($data);
364
365
        try {
366 180
            $this->io->write($data);
367 90
        } catch (AMQPConnectionClosedException $e) {
368
            $this->do_close();
369
            throw $e;
370
        } catch (AMQPRuntimeException $e) {
371
            $this->setIsConnected(false);
372
            throw $e;
373
        }
374 180
    }
375
376 144
    protected function do_close()
377
    {
378 144
        $this->setIsConnected(false);
379 144
        $this->close_input();
380 144
        $this->close_socket();
381 144
    }
382
383
    /**
384
     * @return int
385
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
386
     */
387 174
    public function get_free_channel_id()
388
    {
389 174
        for ($i = 1; $i <= $this->channel_max; $i++) {
390 174
            if (!isset($this->channels[$i])) {
391 174
                return $i;
392
            }
393 9
        }
394
395
        throw new AMQPRuntimeException('No free channel ids');
396
    }
397
398
    /**
399
     * @param string $channel
400
     * @param int $class_id
401
     * @param int $weight
402
     * @param int $body_size
403
     * @param string $packed_properties
404
     * @param string $body
405
     * @param AMQPWriter $pkt
406
     */
407 66
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
408
    {
409 66
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
410 66
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
411 66
    }
412
413
    /**
414
     * Returns a new AMQPWriter or mutates the provided $pkt
415
     *
416
     * @param string $channel
417
     * @param int $class_id
418
     * @param int $weight
419
     * @param int $body_size
420
     * @param string $packed_properties
421
     * @param string $body
422
     * @param AMQPWriter $pkt
423
     * @return AMQPWriter
424
     */
425 66
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
426
    {
427 66
        $pkt = $pkt ?: new AMQPWriter();
428
429
        // Content already prepared ?
430 66
        $key_cache = sprintf(
431 66
            '%s|%s|%s|%s',
432 66
            $channel,
433 66
            $packed_properties,
434 66
            $class_id,
435 33
            $weight
436 33
        );
437
438 66
        if (!isset($this->prepare_content_cache[$key_cache])) {
439 66
            $w = new AMQPWriter();
440 66
            $w->write_octet(2);
441 66
            $w->write_short($channel);
442 66
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
443 66
            $w->write_short($class_id);
444 66
            $w->write_short($weight);
445 66
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
446 66
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
447
                reset($this->prepare_content_cache);
448
                $old_key = key($this->prepare_content_cache);
449
                unset($this->prepare_content_cache[$old_key]);
450
            }
451 33
        }
452 66
        $pkt->write($this->prepare_content_cache[$key_cache]);
453
454 66
        $pkt->write_longlong($body_size);
455 66
        $pkt->write($packed_properties);
456
457 66
        $pkt->write_octet(0xCE);
458
459
460
        // memory efficiency: walk the string instead of biting
461
        // it. good for very large packets (close in size to
462
        // memory_limit setting)
463 66
        $position = 0;
464 66
        $bodyLength = mb_strlen($body,'ASCII');
465 66
        while ($position < $bodyLength) {
466 60
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
467 60
            $position += $this->frame_max - 8;
468
469 60
            $pkt->write_octet(3);
470 60
            $pkt->write_short($channel);
471 60
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
472
473 60
            $pkt->write($payload);
474
475 60
            $pkt->write_octet(0xCE);
476 30
        }
477
478 66
        return $pkt;
479
    }
480
481
    /**
482
     * @param string $channel
483
     * @param array $method_sig
484
     * @param AMQPWriter|string $args
485
     * @param null $pkt
486
     */
487 180
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
488
    {
489 180
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
490 180
        $this->write($pkt->getvalue());
491 180
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
492 180
    }
493
494
    /**
495
     * Returns a new AMQPWriter or mutates the provided $pkt
496
     *
497
     * @param string $channel
498
     * @param array $method_sig
499
     * @param AMQPWriter|string $args
500
     * @param AMQPWriter $pkt
501
     * @return AMQPWriter
502
     */
503 180
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
504
    {
505 180
        if ($args instanceof AMQPWriter) {
506 180
            $args = $args->getvalue();
507 90
        }
508
509 180
        $pkt = $pkt ?: new AMQPWriter();
510
511 180
        $pkt->write_octet(1);
512 180
        $pkt->write_short($channel);
513 180
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
514
        // in payload
515
516 180
        $pkt->write_short($method_sig[0]); // class_id
517 180
        $pkt->write_short($method_sig[1]); // method_id
518 180
        $pkt->write($args);
519
520 180
        $pkt->write_octet(0xCE);
521
522 180
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
523
524 180
        return $pkt;
525
    }
526
527
    /**
528
     * Waits for a frame from the server
529
     *
530
     * @param int|float|null $timeout
531
     * @return array
532
     * @throws \Exception
533
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
534
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
535
     */
536 180
    protected function wait_frame($timeout = 0)
537
    {
538 180
        if (is_null($this->input))
539 90
        {
540
            $this->setIsConnected(false);
541
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
542
        }
543
544 180
        $currentTimeout = $this->input->getTimeout();
545 180
        $this->input->setTimeout($timeout);
546
547
        try {
548
            // frame_type + channel_id + size
549 180
            $this->wait_frame_reader->reuse(
550 180
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
551 90
            );
552
553 180
            $frame_type = $this->wait_frame_reader->read_octet();
554 180
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...ROTOCOL_CONSTANTS_CLASS has been deprecated.

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
555 180
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
556
                throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
557
            }
558 180
            $channel = $this->wait_frame_reader->read_short();
559 180
            $size = $this->wait_frame_reader->read_long();
560
561
            // payload + ch
562 180
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
563
564 180
            $payload = $this->wait_frame_reader->read($size);
565 180
            $ch = $this->wait_frame_reader->read_octet();
566
567 130
        } catch (AMQPTimeoutException $e) {
568 30
            $this->input->setTimeout($currentTimeout);
569 30
            throw $e;
570 49
        } catch (AMQPNoDataException $e) {
571 49
            if ($this->input) {
572 49
                $this->input->setTimeout($currentTimeout);
573 24
            }
574 49
            throw $e;
575
        } catch (AMQPConnectionClosedException $exception) {
576
            $this->do_close();
577
            throw $exception;
578
        }
579
580 180
        $this->input->setTimeout($currentTimeout);
581
582 180
        if ($ch != 0xCE) {
583
            throw new AMQPInvalidFrameException(sprintf(
584
                'Framing error, unexpected byte: %x',
585
                $ch
586
            ));
587
        }
588
589 180
        return array($frame_type, $channel, $payload);
590
    }
591
592
    /**
593
     * Waits for a frame from the server destined for a particular channel.
594
     *
595
     * @param string $channel_id
596
     * @param int|float|null $timeout
597
     * @return array
598
     */
599 180
    protected function wait_channel($channel_id, $timeout = 0)
600
    {
601
        // Keeping the original timeout unchanged.
602 180
        $_timeout = $timeout;
603 180
        while (true) {
604 180
            $now = time();
605
            try {
606 180
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
607 130
            } catch (AMQPTimeoutException $e) {
608 30
                if ( $this->heartbeat && microtime(true) - ($this->heartbeat*2) > $this->last_frame ) {
609
                    $this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
610
                    $this->setIsConnected(false);
611
                    throw new AMQPHeartbeatMissedException("Missed server heartbeat");
612
                }
613
614 30
                throw $e;
615
            }
616
617 180
            $this->last_frame = microtime(true);
618
619 180
            if ($frame_channel === 0 && $frame_type === 8) {
620
                // skip heartbeat frames and reduce the timeout by the time passed
621
                $this->debug->debug_msg("received server heartbeat");
622
                if($_timeout > 0) {
623
                    $_timeout -= time() - $now;
624
                    if($_timeout <= 0) {
625
                        // If timeout has been reached, throw the exception without calling wait_frame
626
                        throw new AMQPTimeoutException("Timeout waiting on channel");
627
                    }
628
                }
629
                continue;
630
631
            } else {
632
633 180
                if ($frame_channel == $channel_id) {
634 180
                    return array($frame_type, $payload);
635
                }
636
637
                // Not the channel we were looking for.  Queue this frame
638
                //for later, when the other channel is looking for frames.
639
                // Make sure the channel still exists, it could have been
640
                // closed by a previous Exception.
641 6
                if (isset($this->channels[$frame_channel])) {
642 6
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
643 3
                }
644
645
                // If we just queued up a method for channel 0 (the Connection
646
                // itself) it's probably a close method in reaction to some
647
                // error, so deal with it right away.
648 6
                if (($frame_type == 1) && ($frame_channel == 0)) {
649
                    $this->wait();
650
                }
651
            }
652 3
        }
653
    }
654
655
    /**
656
     * Fetches a channel object identified by the numeric channel_id, or
657
     * create that object if it doesn't already exist.
658
     *
659
     * @param int $channel_id
660
     * @return AMQPChannel
661
     */
662 174
    public function channel($channel_id = null)
663
    {
664 174
        if (isset($this->channels[$channel_id])) {
665
            return $this->channels[$channel_id];
666
        }
667
668 174
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
669 174
        $ch = new AMQPChannel($this->connection, $channel_id, true, $this->channel_rpc_timeout);
670 174
        $this->channels[$channel_id] = $ch;
671
672 174
        return $ch;
673
    }
674
675
    /**
676
     * Requests a connection close
677
     *
678
     * @param int $reply_code
679
     * @param string $reply_text
680
     * @param array $method_sig
681
     * @return mixed|null
682
     */
683 144
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
684
    {
685 144
        $result = null;
686 144
        $this->io->disableHeartbeat();
687 144
        if (empty($this->protocolWriter) || !$this->isConnected()) {
688 18
            return $result;
689
        }
690
691
        try {
692 144
            $this->closeChannels();
693 144
            list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
694 144
                $reply_code,
695 144
                $reply_text,
696 144
                $method_sig[0],
697 144
                $method_sig[1]
698 72
            );
699 144
            $this->send_method_frame(array($class_id, $method_id), $args);
700 144
            $result = $this->wait(
701 144
                array($this->waitHelper->get_wait('connection.close_ok')),
702 144
                false,
703 144
                $this->connection_timeout
704 72
            );
705 72
        } catch (\Exception $exception) {
706
            $this->do_close();
707
            throw $exception;
708
        }
709
710 144
        $this->setIsConnected(false);
711
712 144
        return $result;
713
    }
714
715
    /**
716
     * @param AMQPReader $reader
717
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
718
     */
719
    protected function connection_close(AMQPReader $reader)
720
    {
721
        $reply_code = $reader->read_short();
722
        $reply_text = $reader->read_shortstr();
723
        $class_id = $reader->read_short();
724
        $method_id = $reader->read_short();
725
726
        $this->x_close_ok();
727
728
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
729
    }
730
731
    /**
732
     * Confirms a connection close
733
     */
734
    protected function x_close_ok()
735
    {
736
        $this->send_method_frame(
737
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
738
        );
739
        $this->do_close();
740
    }
741
742
    /**
743
     * Confirm a connection close
744
     *
745
     * @param AMQPReader $args
746
     */
747 144
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
748
    {
749 144
        $this->do_close();
750 144
    }
751
752
    /**
753
     * @param string $virtual_host
754
     * @param string $capabilities
755
     * @param bool $insist
756
     * @return mixed
757
     */
758 180
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
759
    {
760 180
        $args = new AMQPWriter();
761 180
        $args->write_shortstr($virtual_host);
762 180
        $args->write_shortstr($capabilities);
763 180
        $args->write_bits(array($insist));
764 180
        $this->send_method_frame(array(10, 40), $args);
765
766
        $wait = array(
767 180
            $this->waitHelper->get_wait('connection.open_ok')
768 90
        );
769
770 180
        if ($this->protocolVersion == '0.8') {
771
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
772
        }
773
774 180
        return $this->wait($wait);
775
    }
776
777
    /**
778
     * Signals that the connection is ready
779
     *
780
     * @param AMQPReader $args
781
     */
782 180
    protected function connection_open_ok($args)
783
    {
784 180
        $this->known_hosts = $args->read_shortstr();
785 180
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
786 180
    }
787
788
    /**
789
     * Asks the client to use a different server
790
     *
791
     * @param AMQPReader $args
792
     * @return string
793
     */
794
    protected function connection_redirect($args)
795
    {
796
        $host = $args->read_shortstr();
797
        $this->known_hosts = $args->read_shortstr();
798
        $this->debug->debug_msg(sprintf(
799
                'Redirected to [%s], known_hosts [%s]',
800
                $host,
801
                $this->known_hosts
802
            ));
803
804
        return $host;
805
    }
806
807
    /**
808
     * Security mechanism challenge
809
     *
810
     * @param AMQPReader $args
811
     */
812
    protected function connection_secure($args)
813
    {
814
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
815
    }
816
817
    /**
818
     * Security mechanism response
819
     *
820
     * @param string $response
821
     */
822
    protected function x_secure_ok($response)
823
    {
824
        $args = new AMQPWriter();
825
        $args->write_longstr($response);
826
        $this->send_method_frame(array(10, 21), $args);
827
    }
828
829
    /**
830
     * Starts connection negotiation
831
     *
832
     * @param AMQPReader $args
833
     */
834 180
    protected function connection_start($args)
835
    {
836 180
        $this->version_major = $args->read_octet();
837 180
        $this->version_minor = $args->read_octet();
838 180
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
839 180
        $this->mechanisms = explode(' ', $args->read_longstr());
840 180
        $this->locales = explode(' ', $args->read_longstr());
841
842 180
        $this->debug->debug_connection_start(
843 180
            $this->version_major,
844 180
            $this->version_minor,
845 180
            $this->server_properties,
0 ignored issues
show
Bug introduced by
It seems like $this->server_properties can also be of type object<PhpAmqpLib\Wire\AMQPTable>; however, PhpAmqpLib\Helper\DebugH...ebug_connection_start() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
846 180
            $this->mechanisms,
847 180
            $this->locales
848 90
        );
849 180
    }
850
851
    /**
852
     * @param AMQPTable|array $clientProperties
853
     * @param string $mechanism
854
     * @param string $response
855
     * @param string $locale
856
     */
857 180
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
858
    {
859 180
        $args = new AMQPWriter();
860 180
        $args->write_table($clientProperties);
861 180
        $args->write_shortstr($mechanism);
862 180
        $args->write_longstr($response);
863 180
        $args->write_shortstr($locale);
864 180
        $this->send_method_frame(array(10, 11), $args);
865 180
    }
866
867
    /**
868
     * Proposes connection tuning parameters
869
     *
870
     * @param AMQPReader $args
871
     */
872 180
    protected function connection_tune($args)
873
    {
874 180
        $v = $args->read_short();
875 180
        if ($v) {
876
            $this->channel_max = $v;
877
        }
878
879 180
        $v = $args->read_long();
880 180
        if ($v) {
881 180
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
882 90
        }
883
884
        // use server proposed value if not set
885 180
        if ($this->heartbeat === null) {
886
            $this->heartbeat = $args->read_short();
887
        }
888
889 180
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
890 180
    }
891
892
    /**
893
     * Negotiates connection tuning parameters
894
     *
895
     * @param int $channel_max
896
     * @param int $frame_max
897
     * @param int $heartbeat
898
     */
899 180
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
900
    {
901 180
        $args = new AMQPWriter();
902 180
        $args->write_short($channel_max);
903 180
        $args->write_long($frame_max);
904 180
        $args->write_short($heartbeat);
905 180
        $this->send_method_frame(array(10, 31), $args);
906 180
        $this->wait_tune_ok = false;
907 180
    }
908
909
    /**
910
     * @return resource
911
     * @deprecated No direct access to communication socket should be available.
912
     */
913
    public function getSocket()
914
    {
915
        return $this->io->getSocket();
916
    }
917
918
    /**
919
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
920
     * @deprecated
921
     */
922
    public function getIO()
923
    {
924
        return $this->io;
925
    }
926
927
    /**
928
     * Check connection heartbeat if enabled.
929
     * @throws AMQPHeartbeatMissedException If too much time passed since last connection activity.
930
     * @throws AMQPConnectionClosedException If connection was closed due to network issues or timeouts.
931
     * @throws AMQPSocketException If connection was already closed.
932
     * @throws AMQPTimeoutException If heartbeat write takes too much time.
933
     * @throws AMQPIOException If other connection problems occurred.
934
     */
935
    public function checkHeartBeat()
936
    {
937
        $this->io->check_heartbeat();
938
    }
939
940
    /**
941
     * Handles connection blocked notifications
942
     *
943
     * @param AMQPReader $args
944
     */
945
    protected function connection_blocked(AMQPReader $args)
946
    {
947
        // Call the block handler and pass in the reason
948
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
949
    }
950
951
    /**
952
     * Handles connection unblocked notifications
953
     *
954
     * @param AMQPReader $args
955
     */
956
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
957
    {
958
        // No args to an unblock event
959
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
960
    }
961
962
    /**
963
     * Sets a handler which is called whenever a connection.block is sent from the server
964
     *
965
     * @param callable $callback
966
     */
967
    public function set_connection_block_handler($callback)
968
    {
969
        $this->connection_block_handler = $callback;
970
    }
971
972
    /**
973
     * Sets a handler which is called whenever a connection.block is sent from the server
974
     *
975
     * @param callable $callback
976
     */
977
    public function set_connection_unblock_handler($callback)
978
    {
979
        $this->connection_unblock_handler = $callback;
980
    }
981
982
    /**
983
     * Gets the connection status
984
     *
985
     * @return bool
986
     */
987 186
    public function isConnected()
988
    {
989 186
        return (bool) $this->is_connected;
990
    }
991
992
    /**
993
     * Set the connection status
994
     *
995
     * @param bool $is_connected
996
     */
997 186
    protected function setIsConnected($is_connected)
998
    {
999 186
        $this->is_connected = (bool) $is_connected;
1000 186
    }
1001
1002
    /**
1003
     * Closes all available channels
1004
     */
1005 150
    protected function closeChannels()
1006
    {
1007 150
        foreach ($this->channels as $key => $channel) {
1008
            // channels[0] is this connection object, so don't close it yet
1009 144
            if ($key === 0) {
1010 144
                continue;
1011
            }
1012
            try {
1013 42
                $channel->close();
1014 35
            } catch (\Exception $e) {
1015
                /* Ignore closing errors */
1016
            }
1017 75
        }
1018 150
    }
1019
1020
    /**
1021
     * Should the connection be attempted during construction?
1022
     *
1023
     * @return bool
1024
     */
1025 162
    public function connectOnConstruct()
1026
    {
1027 162
        return true;
1028
    }
1029
1030
    /**
1031
     * @return array
1032
     */
1033
    public function getServerProperties()
1034
    {
1035
        return $this->server_properties;
1036
    }
1037
1038
    /**
1039
     * Get the library properties for populating the client protocol information
1040
     *
1041
     * @return array
1042
     */
1043 192
    public function getLibraryProperties()
1044
    {
1045 192
        return self::$LIBRARY_PROPERTIES;
1046
    }
1047
1048 12
    public static function create_connection($hosts, $options = array()){
1049 12
        $latest_exception = null;
1050 12
        foreach ($hosts as $hostdef) {
1051 12
            AbstractConnection::validate_host($hostdef);
1052 12
            $host = $hostdef['host'];
1053 12
            $port = $hostdef['port'];
1054 12
            $user = $hostdef['user'];
1055 12
            $password = $hostdef['password'];
1056 12
            $vhost = isset($hostdef['vhost']) ? $hostdef['vhost'] : "/";
1057
            try {
1058 12
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
0 ignored issues
show
Bug introduced by
The method try_create_connection() does not exist on PhpAmqpLib\Connection\AbstractConnection. Did you maybe mean connect()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
1059 12
                return $conn;
1060
            } catch (\Exception $e) {
1061
                $latest_exception = $e;
1062
            }
1063
        }
1064
        throw $latest_exception;
1065
    }
1066
1067
    public static function validate_host($host) {
1068
        if(!isset($host['host'])){
1069
            throw new \InvalidArgumentException("'host' key is required.");
1070
        }
1071
        if(!isset($host['port'])){
1072
            throw new \InvalidArgumentException("'port' key is required.");
1073
        }
1074
        if(!isset($host['user'])){
1075
            throw new \InvalidArgumentException("'user' key is required.");
1076
        }
1077
        if(!isset($host['password'])){
1078
            throw new \InvalidArgumentException("'password' key is required.");
1079
        }
1080
    }
1081
}
1082