Completed
Push — master ( ff1afe...c244f9 )
by
unknown
16:11 queued 14:40
created

AbstractConnection::close()   A

Complexity

Conditions 4
Paths 6

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 4.0105

Importance

Changes 0
Metric Value
dl 0
loc 31
ccs 21
cts 23
cp 0.913
rs 9.424
c 0
b 0
f 0
cc 4
nc 6
nop 3
crap 4.0105
1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AbstractChannel;
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
7
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
8
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
9
use PhpAmqpLib\Exception\AMQPNoDataException;
10
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
11
use PhpAmqpLib\Exception\AMQPRuntimeException;
12
use PhpAmqpLib\Exception\AMQPTimeoutException;
13
use PhpAmqpLib\Wire\AMQPReader;
14
use PhpAmqpLib\Wire\AMQPTable;
15
use PhpAmqpLib\Wire\AMQPWriter;
16
use PhpAmqpLib\Wire\IO\AbstractIO;
17
18
class AbstractConnection extends AbstractChannel
19
{
20
    /**
21
     * @var array
22
     * @internal
23
     */
24
    public static $LIBRARY_PROPERTIES = array(
25
        'product' => array('S', 'AMQPLib'),
26
        'platform' => array('S', 'PHP'),
27
        'version' => array('S', '2.9'),
28
        'information' => array('S', ''),
29
        'copyright' => array('S', ''),
30
        'capabilities' => array(
31
            'F',
32
            array(
33
                'authentication_failure_close' => array('t', true),
34
                'publisher_confirms' => array('t', true),
35
                'consumer_cancel_notify' => array('t', true),
36
                'exchange_exchange_bindings' => array('t', true),
37
                'basic.nack' => array('t', true),
38
                'connection.blocked' => array('t', true)
39
            )
40
        )
41
    );
42
43
    /**
44
     * @var AMQPChannel[]
45
     * @internal
46
     */
47
    public $channels = array();
48
49
    /** @var int */
50
    protected $version_major;
51
52
    /** @var int */
53
    protected $version_minor;
54
55
    /** @var array */
56
    protected $server_properties;
57
58
    /** @var array */
59
    protected $mechanisms;
60
61
    /** @var array */
62
    protected $locales;
63
64
    /** @var bool */
65
    protected $wait_tune_ok;
66
67
    /** @var string */
68
    protected $known_hosts;
69
70
    /** @var AMQPReader */
71
    protected $input;
72
73
    /** @var string */
74
    protected $vhost;
75
76
    /** @var bool */
77
    protected $insist;
78
79
    /** @var string */
80
    protected $login_method;
81
82
    /** @var string */
83
    protected $login_response;
84
85
    /** @var string */
86
    protected $locale;
87
88
    /** @var int */
89
    protected $heartbeat;
90
91
    /** @var float */
92
    protected $last_frame;
93
94
    /** @var int */
95
    protected $channel_max = 65535;
96
97
    /** @var int */
98
    protected $frame_max = 131072;
99
100
     /** @var array Constructor parameters for clone */
101
    protected $construct_params;
102
103
    /** @var bool Close the connection in destructor */
104
    protected $close_on_destruct = true;
105
106
    /** @var bool Maintain connection status */
107
    protected $is_connected = false;
108
109
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
110
    protected $io;
111
112
    /** @var \PhpAmqpLib\Wire\AMQPReader */
113
    protected $wait_frame_reader;
114
115
    /** @var callable Handles connection blocking from the server */
116
    private $connection_block_handler;
117
118
    /** @var callable Handles connection unblocking from the server */
119
    private $connection_unblock_handler;
120
121
    /** @var int Connection timeout value*/
122
    protected $connection_timeout ;
123
124
    /**
125
     * Circular buffer to speed up prepare_content().
126
     * Max size limited by $prepare_content_cache_max_size.
127
     *
128
     * @var array
129
     * @see prepare_content()
130
     */
131
    private $prepare_content_cache = array();
132
133
    /** @var int Maximal size of $prepare_content_cache */
134
    private $prepare_content_cache_max_size = 100;
135
136
    /**
137
     * Maximum time to wait for channel operations, in seconds
138
     * @var float $channel_rpc_timeout
139
     */
140
    private $channel_rpc_timeout;
141
142
    /**
143
     * @param string $user
144
     * @param string $password
145
     * @param string $vhost
146
     * @param bool $insist
147
     * @param string $login_method
148
     * @param null $login_response
149
     * @param string $locale
150
     * @param AbstractIO $io
151
     * @param int $heartbeat
152
     * @param int $connection_timeout
153
     * @param float $channel_rpc_timeout
154
     * @throws \Exception
155
     */
156 186
    public function __construct(
157
        $user,
158
        $password,
159
        $vhost = '/',
160
        $insist = false,
161
        $login_method = 'AMQPLAIN',
162
        $login_response = null,
163
        $locale = 'en_US',
164
        AbstractIO $io,
165
        $heartbeat = 60,
166
        $connection_timeout = 0,
167
        $channel_rpc_timeout = 0.0
168
    ) {
169
        // save the params for the use of __clone
170 186
        $this->construct_params = func_get_args();
171
172 186
        $this->wait_frame_reader = new AMQPReader(null);
173 186
        $this->vhost = $vhost;
174 186
        $this->insist = $insist;
175 186
        $this->login_method = $login_method;
176 186
        $this->login_response = $login_response;
177 186
        $this->locale = $locale;
178 186
        $this->io = $io;
179 186
        $this->heartbeat = $heartbeat;
180 186
        $this->connection_timeout = $connection_timeout;
181 186
        $this->channel_rpc_timeout = $channel_rpc_timeout;
182
183 186
        if ($user && $password) {
184 186
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
185 186
            $this->login_response->write_table(array(
186 186
                'LOGIN' => array('S', $user),
187 186
                'PASSWORD' => array('S', $password)
188 93
            ));
189
190
            // Skip the length
191 186
            $responseValue = $this->login_response->getvalue();
192 186
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
193
194 93
        } else {
195
            $this->login_response = null;
196
        }
197
198
        // Lazy Connection waits on connecting
199 186
        if ($this->connectOnConstruct()) {
200 162
            $this->connect();
201 78
        }
202 180
    }
203
204
    /**
205
     * Connects to the AMQP server
206
     */
207 186
    protected function connect()
208
    {
209
        try {
210
            // Loop until we connect
211 186
            while (!$this->isConnected()) {
212
                // Assume we will connect, until we dont
213 186
                $this->setIsConnected(true);
214
215
                // Connect the socket
216 186
                $this->io->connect();
217
218 180
                $this->channels = array();
219
                // The connection object itself is treated as channel 0
220 180
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
221
222 180
                $this->input = new AMQPReader(null, $this->io);
223
224 180
                $this->write($this->amqp_protocol_header);
225 180
                $this->wait(array($this->waitHelper->get_wait('connection.start')),false,$this->connection_timeout);
226 180
                $this->x_start_ok(
227 180
                    $this->getLibraryProperties(),
228 180
                    $this->login_method,
229 180
                    $this->login_response,
230 180
                    $this->locale
231 90
                );
232
233 180
                $this->wait_tune_ok = true;
234 180
                while ($this->wait_tune_ok) {
235 180
                    $this->wait(array(
236 180
                        $this->waitHelper->get_wait('connection.secure'),
237 180
                        $this->waitHelper->get_wait('connection.tune')
238 90
                    ));
239 90
                }
240
241 180
                $host = $this->x_open($this->vhost, '', $this->insist);
242 180
                if (!$host) {
243
                    //Reconnected
244 180
                    $this->io->reenableHeartbeat();
245 180
                    return null; // we weren't redirected
246
                }
247
248
                $this->setIsConnected(false);
249
                $this->closeChannels();
250
251
                // we were redirected, close the socket, loop and try again
252
                $this->close_socket();
253
            }
254
255 18
        } catch (\Exception $e) {
256
            // Something went wrong, set the connection status
257 6
            $this->setIsConnected(false);
258 6
            $this->closeChannels();
259 6
            $this->close_input();
260 6
            $this->close_socket();
261 6
            throw $e; // Rethrow exception
262
        }
263 24
    }
264
265
    /**
266
     * Reconnects using the original connection settings.
267
     * This will not recreate any channels that were established previously
268
     */
269 36
    public function reconnect()
270
    {
271
        // Try to close the AMQP connection
272 36
        $this->safeClose();
273
        // Reconnect the socket/stream then AMQP
274 36
        $this->io->close();
275 36
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
276 36
        $this->connect();
277 36
    }
278
279
    /**
280
     * Cloning will use the old properties to make a new connection to the same server
281
     */
282
    public function __clone()
283
    {
284
        call_user_func_array(array($this, '__construct'), $this->construct_params);
285
    }
286
287 12
    public function __destruct()
288
    {
289 12
        if ($this->close_on_destruct) {
290 12
            $this->safeClose();
291 6
        }
292 12
    }
293
294
    /**
295
     * Attempts to close the connection safely
296
     */
297 48
    protected function safeClose()
298
    {
299
        try {
300 48
            if (isset($this->input) && $this->input) {
301 33
                $this->close();
302 9
            }
303 24
        } catch (\Exception $e) {
304
            // Nothing here
305
        }
306 48
    }
307
308
    /**
309
     * @param int $sec
310
     * @param int $usec
311
     * @return mixed
312
     */
313 View Code Duplication
    public function select($sec, $usec = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
314
    {
315
        try {
316
            return $this->io->select($sec, $usec);
317
        } catch (AMQPConnectionClosedException $e) {
318
            $this->do_close();
319
            throw $e;
320
        } catch (AMQPRuntimeException $e) {
321
            $this->setIsConnected(false);
322
            throw $e;
323
        }
324
    }
325
326
    /**
327
     * Allows to not close the connection
328
     * it's useful after the fork when you don't want to close parent process connection
329
     *
330
     * @param bool $close
331
     */
332
    public function set_close_on_destruct($close = true)
333
    {
334
        $this->close_on_destruct = (bool) $close;
335
    }
336
337 150
    protected function close_input()
338
    {
339 150
        $this->debug && $this->debug->debug_msg('closing input');
340
341 150
        if (!is_null($this->input)) {
342 144
            $this->input->close();
343 144
            $this->input = null;
344 72
        }
345 150
    }
346
347 150
    protected function close_socket()
348
    {
349 150
        $this->debug && $this->debug->debug_msg('closing socket');
350
351 150
        if ($this->io) {
352 150
            $this->io->close();
353 75
        }
354 150
    }
355
356
    /**
357
     * @param string $data
358
     */
359 180 View Code Duplication
    public function write($data)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
360
    {
361 180
        $this->debug->debug_hexdump($data);
362
363
        try {
364 180
            $this->io->write($data);
365 90
        } catch (AMQPConnectionClosedException $e) {
366
            $this->do_close();
367
            throw $e;
368
        } catch (AMQPRuntimeException $e) {
369
            $this->setIsConnected(false);
370
            throw $e;
371
        }
372 180
    }
373
374 144
    protected function do_close()
375
    {
376 144
        $this->setIsConnected(false);
377 144
        $this->close_input();
378 144
        $this->close_socket();
379 144
    }
380
381
    /**
382
     * @return int
383
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
384
     */
385 174
    public function get_free_channel_id()
386
    {
387 174
        for ($i = 1; $i <= $this->channel_max; $i++) {
388 174
            if (!isset($this->channels[$i])) {
389 174
                return $i;
390
            }
391 9
        }
392
393
        throw new AMQPRuntimeException('No free channel ids');
394
    }
395
396
    /**
397
     * @param string $channel
398
     * @param int $class_id
399
     * @param int $weight
400
     * @param int $body_size
401
     * @param string $packed_properties
402
     * @param string $body
403
     * @param AMQPWriter $pkt
404
     */
405 66
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
406
    {
407 66
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
408 66
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
409 66
    }
410
411
    /**
412
     * Returns a new AMQPWriter or mutates the provided $pkt
413
     *
414
     * @param string $channel
415
     * @param int $class_id
416
     * @param int $weight
417
     * @param int $body_size
418
     * @param string $packed_properties
419
     * @param string $body
420
     * @param AMQPWriter $pkt
421
     * @return AMQPWriter
422
     */
423 66
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
424
    {
425 66
        $pkt = $pkt ?: new AMQPWriter();
426
427
        // Content already prepared ?
428 66
        $key_cache = sprintf(
429 66
            '%s|%s|%s|%s',
430 55
            $channel,
431 55
            $packed_properties,
432 55
            $class_id,
433 22
            $weight
434 33
        );
435
436 66
        if (!isset($this->prepare_content_cache[$key_cache])) {
437 66
            $w = new AMQPWriter();
438 66
            $w->write_octet(2);
439 66
            $w->write_short($channel);
440 66
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
441 66
            $w->write_short($class_id);
442 66
            $w->write_short($weight);
443 66
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
444 66
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
445
                reset($this->prepare_content_cache);
446
                $old_key = key($this->prepare_content_cache);
447
                unset($this->prepare_content_cache[$old_key]);
448
            }
449 33
        }
450 66
        $pkt->write($this->prepare_content_cache[$key_cache]);
451
452 66
        $pkt->write_longlong($body_size);
453 66
        $pkt->write($packed_properties);
454
455 66
        $pkt->write_octet(0xCE);
456
457
458
        // memory efficiency: walk the string instead of biting
459
        // it. good for very large packets (close in size to
460
        // memory_limit setting)
461 66
        $position = 0;
462 66
        $bodyLength = mb_strlen($body,'ASCII');
463 66
        while ($position < $bodyLength) {
464 60
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
465 60
            $position += $this->frame_max - 8;
466
467 60
            $pkt->write_octet(3);
468 60
            $pkt->write_short($channel);
469 60
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
470
471 60
            $pkt->write($payload);
472
473 60
            $pkt->write_octet(0xCE);
474 30
        }
475
476 66
        return $pkt;
477
    }
478
479
    /**
480
     * @param string $channel
481
     * @param array $method_sig
482
     * @param AMQPWriter|string $args
483
     * @param null $pkt
484
     */
485 180
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
486
    {
487 180
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
488 180
        $this->write($pkt->getvalue());
489 180
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
490 180
    }
491
492
    /**
493
     * Returns a new AMQPWriter or mutates the provided $pkt
494
     *
495
     * @param string $channel
496
     * @param array $method_sig
497
     * @param AMQPWriter|string $args
498
     * @param AMQPWriter $pkt
499
     * @return AMQPWriter
500
     */
501 180
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
502
    {
503 180
        if ($args instanceof AMQPWriter) {
504 180
            $args = $args->getvalue();
505 90
        }
506
507 180
        $pkt = $pkt ?: new AMQPWriter();
508
509 180
        $pkt->write_octet(1);
510 180
        $pkt->write_short($channel);
511 180
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
512
        // in payload
513
514 180
        $pkt->write_short($method_sig[0]); // class_id
515 180
        $pkt->write_short($method_sig[1]); // method_id
516 180
        $pkt->write($args);
517
518 180
        $pkt->write_octet(0xCE);
519
520 180
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
521
522 180
        return $pkt;
523
    }
524
525
    /**
526
     * Waits for a frame from the server
527
     *
528
     * @param int|float|null $timeout
529
     * @return array
530
     * @throws \Exception
531
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
532
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
533
     */
534 180
    protected function wait_frame($timeout = 0)
535
    {
536 180
        if (is_null($this->input))
537 90
        {
538
            $this->setIsConnected(false);
539
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
540
        }
541
542 180
        $currentTimeout = $this->input->getTimeout();
543 180
        $this->input->setTimeout($timeout);
544
545
        try {
546
            // frame_type + channel_id + size
547 180
            $this->wait_frame_reader->reuse(
548 180
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
549 90
            );
550
551 180
            $frame_type = $this->wait_frame_reader->read_octet();
552 180
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...ROTOCOL_CONSTANTS_CLASS has been deprecated.

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
553 180
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
554
                throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
555
            }
556 180
            $channel = $this->wait_frame_reader->read_short();
557 180
            $size = $this->wait_frame_reader->read_long();
558
559
            // payload + ch
560 180
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
561
562 180
            $payload = $this->wait_frame_reader->read($size);
563 180
            $ch = $this->wait_frame_reader->read_octet();
564
565 130
        } catch (AMQPTimeoutException $e) {
566 30
            $this->input->setTimeout($currentTimeout);
567 30
            throw $e;
568 50
        } catch (AMQPNoDataException $e) {
569 50
            if ($this->input) {
570 50
                $this->input->setTimeout($currentTimeout);
571 25
            }
572 50
            throw $e;
573
        } catch (AMQPConnectionClosedException $exception) {
574
            $this->do_close();
575
            throw $exception;
576
        }
577
578 180
        $this->input->setTimeout($currentTimeout);
579
580 180
        if ($ch != 0xCE) {
581
            throw new AMQPInvalidFrameException(sprintf(
582
                'Framing error, unexpected byte: %x',
583
                $ch
584
            ));
585
        }
586
587 180
        return array($frame_type, $channel, $payload);
588
    }
589
590
    /**
591
     * Waits for a frame from the server destined for a particular channel.
592
     *
593
     * @param string $channel_id
594
     * @param int|float|null $timeout
595
     * @return array
596
     */
597 180
    protected function wait_channel($channel_id, $timeout = 0)
598
    {
599
        // Keeping the original timeout unchanged.
600 180
        $_timeout = $timeout;
601 180
        while (true) {
602 180
            $now = time();
603
            try {
604 180
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
605 130
            } catch (AMQPTimeoutException $e) {
606 30
                if ( $this->heartbeat && microtime(true) - ($this->heartbeat*2) > $this->last_frame ) {
607
                    $this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
608
                    $this->setIsConnected(false);
609
                    throw new AMQPHeartbeatMissedException("Missed server heartbeat");
610
                }
611
612 30
                throw $e;
613
            }
614
615 180
            $this->last_frame = microtime(true);
616
617 180
            if ($frame_channel === 0 && $frame_type === 8) {
618
                // skip heartbeat frames and reduce the timeout by the time passed
619
                $this->debug->debug_msg("received server heartbeat");
620
                if($_timeout > 0) {
621
                    $_timeout -= time() - $now;
622
                    if($_timeout <= 0) {
623
                        // If timeout has been reached, throw the exception without calling wait_frame
624
                        throw new AMQPTimeoutException("Timeout waiting on channel");
625
                    }
626
                }
627
                continue;
628
629
            } else {
630
631 180
                if ($frame_channel == $channel_id) {
632 180
                    return array($frame_type, $payload);
633
                }
634
635
                // Not the channel we were looking for.  Queue this frame
636
                //for later, when the other channel is looking for frames.
637
                // Make sure the channel still exists, it could have been
638
                // closed by a previous Exception.
639 6
                if (isset($this->channels[$frame_channel])) {
640 6
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
641 3
                }
642
643
                // If we just queued up a method for channel 0 (the Connection
644
                // itself) it's probably a close method in reaction to some
645
                // error, so deal with it right away.
646 6
                if (($frame_type == 1) && ($frame_channel == 0)) {
647
                    $this->wait();
648
                }
649
            }
650 3
        }
651
    }
652
653
    /**
654
     * Fetches a channel object identified by the numeric channel_id, or
655
     * create that object if it doesn't already exist.
656
     *
657
     * @param int $channel_id
658
     * @return AMQPChannel
659
     */
660 174
    public function channel($channel_id = null)
661
    {
662 174
        if (isset($this->channels[$channel_id])) {
663
            return $this->channels[$channel_id];
664
        }
665
666 174
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
667 174
        $ch = new AMQPChannel($this->connection, $channel_id, true, $this->channel_rpc_timeout);
668 174
        $this->channels[$channel_id] = $ch;
669
670 174
        return $ch;
671
    }
672
673
    /**
674
     * Requests a connection close
675
     *
676
     * @param int $reply_code
677
     * @param string $reply_text
678
     * @param array $method_sig
679
     * @return mixed|null
680
     */
681 144
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
682
    {
683 144
        $result = null;
684 144
        $this->io->disableHeartbeat();
685 144
        if (empty($this->protocolWriter) || !$this->isConnected()) {
686 18
            return $result;
687
        }
688
689
        try {
690 144
            $this->closeChannels();
691 144
            list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
692 144
                $reply_code,
693 120
                $reply_text,
694 144
                $method_sig[0],
695 144
                $method_sig[1]
696 72
            );
697 144
            $this->send_method_frame(array($class_id, $method_id), $args);
698 144
            $result = $this->wait(
699 144
                array($this->waitHelper->get_wait('connection.close_ok')),
700 144
                false,
701 144
                $this->connection_timeout
702 72
            );
703 72
        } catch (\Exception $exception) {
704
            $this->do_close();
705
            throw $exception;
706
        }
707
708 144
        $this->setIsConnected(false);
709
710 144
        return $result;
711
    }
712
713
    /**
714
     * @param AMQPReader $reader
715
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
716
     */
717
    protected function connection_close(AMQPReader $reader)
718
    {
719
        $reply_code = $reader->read_short();
720
        $reply_text = $reader->read_shortstr();
721
        $class_id = $reader->read_short();
722
        $method_id = $reader->read_short();
723
724
        $this->x_close_ok();
725
726
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
727
    }
728
729
    /**
730
     * Confirms a connection close
731
     */
732
    protected function x_close_ok()
733
    {
734
        $this->send_method_frame(
735
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
736
        );
737
        $this->do_close();
738
    }
739
740
    /**
741
     * Confirm a connection close
742
     *
743
     * @param AMQPReader $args
744
     */
745 144
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
746
    {
747 144
        $this->do_close();
748 144
    }
749
750
    /**
751
     * @param string $virtual_host
752
     * @param string $capabilities
753
     * @param bool $insist
754
     * @return mixed
755
     */
756 180
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
757
    {
758 180
        $args = new AMQPWriter();
759 180
        $args->write_shortstr($virtual_host);
760 180
        $args->write_shortstr($capabilities);
761 180
        $args->write_bits(array($insist));
762 180
        $this->send_method_frame(array(10, 40), $args);
763
764
        $wait = array(
765 180
            $this->waitHelper->get_wait('connection.open_ok')
766 90
        );
767
768 180
        if ($this->protocolVersion == '0.8') {
769
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
770
        }
771
772 180
        return $this->wait($wait);
773
    }
774
775
    /**
776
     * Signals that the connection is ready
777
     *
778
     * @param AMQPReader $args
779
     */
780 180
    protected function connection_open_ok($args)
781
    {
782 180
        $this->known_hosts = $args->read_shortstr();
783 180
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
784 180
    }
785
786
    /**
787
     * Asks the client to use a different server
788
     *
789
     * @param AMQPReader $args
790
     * @return string
791
     */
792
    protected function connection_redirect($args)
793
    {
794
        $host = $args->read_shortstr();
795
        $this->known_hosts = $args->read_shortstr();
796
        $this->debug->debug_msg(sprintf(
797
                'Redirected to [%s], known_hosts [%s]',
798
                $host,
799
                $this->known_hosts
800
            ));
801
802
        return $host;
803
    }
804
805
    /**
806
     * Security mechanism challenge
807
     *
808
     * @param AMQPReader $args
809
     */
810
    protected function connection_secure($args)
811
    {
812
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
813
    }
814
815
    /**
816
     * Security mechanism response
817
     *
818
     * @param string $response
819
     */
820
    protected function x_secure_ok($response)
821
    {
822
        $args = new AMQPWriter();
823
        $args->write_longstr($response);
824
        $this->send_method_frame(array(10, 21), $args);
825
    }
826
827
    /**
828
     * Starts connection negotiation
829
     *
830
     * @param AMQPReader $args
831
     */
832 180
    protected function connection_start($args)
833
    {
834 180
        $this->version_major = $args->read_octet();
835 180
        $this->version_minor = $args->read_octet();
836 180
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
837 180
        $this->mechanisms = explode(' ', $args->read_longstr());
838 180
        $this->locales = explode(' ', $args->read_longstr());
839
840 180
        $this->debug->debug_connection_start(
841 180
            $this->version_major,
842 180
            $this->version_minor,
843 180
            $this->server_properties,
0 ignored issues
show
Bug introduced by
It seems like $this->server_properties can also be of type object<PhpAmqpLib\Wire\AMQPTable>; however, PhpAmqpLib\Helper\DebugH...ebug_connection_start() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
844 180
            $this->mechanisms,
845 180
            $this->locales
846 90
        );
847 180
    }
848
849
    /**
850
     * @param AMQPTable|array $clientProperties
851
     * @param string $mechanism
852
     * @param string $response
853
     * @param string $locale
854
     */
855 180
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
856
    {
857 180
        $args = new AMQPWriter();
858 180
        $args->write_table($clientProperties);
859 180
        $args->write_shortstr($mechanism);
860 180
        $args->write_longstr($response);
861 180
        $args->write_shortstr($locale);
862 180
        $this->send_method_frame(array(10, 11), $args);
863 180
    }
864
865
    /**
866
     * Proposes connection tuning parameters
867
     *
868
     * @param AMQPReader $args
869
     */
870 180
    protected function connection_tune($args)
871
    {
872 180
        $v = $args->read_short();
873 180
        if ($v) {
874
            $this->channel_max = $v;
875
        }
876
877 180
        $v = $args->read_long();
878 180
        if ($v) {
879 180
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
880 90
        }
881
882
        // use server proposed value if not set
883 180
        if ($this->heartbeat === null) {
884
            $this->heartbeat = $args->read_short();
885
        }
886
887 180
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
888 180
    }
889
890
    /**
891
     * Negotiates connection tuning parameters
892
     *
893
     * @param int $channel_max
894
     * @param int $frame_max
895
     * @param int $heartbeat
896
     */
897 180
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
898
    {
899 180
        $args = new AMQPWriter();
900 180
        $args->write_short($channel_max);
901 180
        $args->write_long($frame_max);
902 180
        $args->write_short($heartbeat);
903 180
        $this->send_method_frame(array(10, 31), $args);
904 180
        $this->wait_tune_ok = false;
905 180
    }
906
907
    /**
908
     * @return resource
909
     * @deprecated No direct access to communication socket should be available.
910
     */
911
    public function getSocket()
912
    {
913
        return $this->io->getSocket();
914
    }
915
916
    /**
917
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
918
     * @deprecated
919
     */
920
    public function getIO()
921
    {
922
        return $this->io;
923
    }
924
925
    /**
926
     * Handles connection blocked notifications
927
     *
928
     * @param AMQPReader $args
929
     */
930
    protected function connection_blocked(AMQPReader $args)
931
    {
932
        // Call the block handler and pass in the reason
933
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
934
    }
935
936
    /**
937
     * Handles connection unblocked notifications
938
     *
939
     * @param AMQPReader $args
940
     */
941
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
942
    {
943
        // No args to an unblock event
944
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
945
    }
946
947
    /**
948
     * Sets a handler which is called whenever a connection.block is sent from the server
949
     *
950
     * @param callable $callback
951
     */
952
    public function set_connection_block_handler($callback)
953
    {
954
        $this->connection_block_handler = $callback;
955
    }
956
957
    /**
958
     * Sets a handler which is called whenever a connection.block is sent from the server
959
     *
960
     * @param callable $callback
961
     */
962
    public function set_connection_unblock_handler($callback)
963
    {
964
        $this->connection_unblock_handler = $callback;
965
    }
966
967
    /**
968
     * Gets the connection status
969
     *
970
     * @return bool
971
     */
972 186
    public function isConnected()
973
    {
974 186
        return (bool) $this->is_connected;
975
    }
976
977
    /**
978
     * Set the connection status
979
     *
980
     * @param bool $is_connected
981
     */
982 186
    protected function setIsConnected($is_connected)
983
    {
984 186
        $this->is_connected = (bool) $is_connected;
985 186
    }
986
987
    /**
988
     * Closes all available channels
989
     */
990 150
    protected function closeChannels()
991
    {
992 150
        foreach ($this->channels as $key => $channel) {
993
            // channels[0] is this connection object, so don't close it yet
994 144
            if ($key === 0) {
995 144
                continue;
996
            }
997
            try {
998 42
                $channel->close();
999 28
            } catch (\Exception $e) {
1000
                /* Ignore closing errors */
1001
            }
1002 75
        }
1003 150
    }
1004
1005
    /**
1006
     * Should the connection be attempted during construction?
1007
     *
1008
     * @return bool
1009
     */
1010 162
    public function connectOnConstruct()
1011
    {
1012 162
        return true;
1013
    }
1014
1015
    /**
1016
     * @return array
1017
     */
1018
    public function getServerProperties()
1019
    {
1020
        return $this->server_properties;
1021
    }
1022
1023
    /**
1024
     * Get the library properties for populating the client protocol information
1025
     *
1026
     * @return array
1027
     */
1028 192
    public function getLibraryProperties()
1029
    {
1030 192
        return self::$LIBRARY_PROPERTIES;
1031
    }
1032
1033
    public static function create_connection($hosts, $options = array()){
1034
        $latest_exception = null;
1035
        for($i = 0; $i < count($hosts); $i++) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
1036
            AbstractConnection::validate_host($hosts[$i]);
1037
            $host = $hosts[$i]['host'];
1038
            $port = $hosts[$i]['port'];
1039
            $user = $hosts[$i]['user'];
1040
            $password = $hosts[$i]['password'];
1041
            $vhost = isset($hosts[$i]['vhost']) ? $hosts[$i]['vhost'] : "/";
1042
            try {
1043
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
0 ignored issues
show
Bug introduced by
The method try_create_connection() does not exist on PhpAmqpLib\Connection\AbstractConnection. Did you maybe mean connect()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
1044
                return $conn;
1045
            } catch (\Exception $e) {
1046
                $latest_exception = $e;
1047
            }
1048
        }
1049
        throw $latest_exception;
1050
    }
1051
1052
    public static function validate_host($host) {
1053
        if(!isset($host['host'])){
1054
            throw new \InvalidArgumentException("'host' key is required.");
1055
        }
1056
        if(!isset($host['port'])){
1057
            throw new \InvalidArgumentException("'port' key is required.");
1058
        }
1059
        if(!isset($host['user'])){
1060
            throw new \InvalidArgumentException("'user' key is required.");
1061
        }
1062
        if(!isset($host['password'])){
1063
            throw new \InvalidArgumentException("'password' key is required.");
1064
        }
1065
    }
1066
}
1067