Completed
Push — master ( d75eab...48de7d )
by
unknown
24s
created

AbstractConnection::channel()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3.0261

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 6
cts 7
cp 0.8571
rs 9.8666
c 0
b 0
f 0
cc 3
nc 3
nop 1
crap 3.0261
1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AbstractChannel;
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
7
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
8
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
9
use PhpAmqpLib\Exception\AMQPNoDataException;
10
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
11
use PhpAmqpLib\Exception\AMQPRuntimeException;
12
use PhpAmqpLib\Exception\AMQPTimeoutException;
13
use PhpAmqpLib\Wire\AMQPReader;
14
use PhpAmqpLib\Wire\AMQPTable;
15
use PhpAmqpLib\Wire\AMQPWriter;
16
use PhpAmqpLib\Wire\IO\AbstractIO;
17
18
class AbstractConnection extends AbstractChannel
19
{
20
    /** @var array */
21
    public static $LIBRARY_PROPERTIES = array(
22
        'product' => array('S', 'AMQPLib'),
23
        'platform' => array('S', 'PHP'),
24
        'version' => array('S', '2.8'),
25
        'information' => array('S', ''),
26
        'copyright' => array('S', ''),
27
        'capabilities' => array(
28
            'F',
29
            array(
30
                'authentication_failure_close' => array('t', true),
31
                'publisher_confirms' => array('t', true),
32
                'consumer_cancel_notify' => array('t', true),
33
                'exchange_exchange_bindings' => array('t', true),
34
                'basic.nack' => array('t', true),
35
                'connection.blocked' => array('t', true)
36
            )
37
        )
38
    );
39
40
    /** @var AMQPChannel[] */
41
    public $channels = array();
42
43
    /** @var int */
44
    protected $version_major;
45
46
    /** @var int */
47
    protected $version_minor;
48
49
    /** @var array */
50
    protected $server_properties;
51
52
    /** @var array */
53
    protected $mechanisms;
54
55
    /** @var array */
56
    protected $locales;
57
58
    /** @var bool */
59
    protected $wait_tune_ok;
60
61
    /** @var string */
62
    protected $known_hosts;
63
64
    /** @var AMQPReader */
65
    protected $input;
66
67
    /** @var string */
68
    protected $vhost;
69
70
    /** @var bool */
71
    protected $insist;
72
73
    /** @var string */
74
    protected $login_method;
75
76
    /** @var string */
77
    protected $login_response;
78
79
    /** @var string */
80
    protected $locale;
81
82
    /** @var int */
83
    protected $heartbeat;
84
85
    /** @var float */
86
    protected $last_frame;
87
88
    /** @var int */
89
    protected $channel_max = 65535;
90
91
    /** @var int */
92
    protected $frame_max = 131072;
93
94
     /** @var array Constructor parameters for clone */
95
    protected $construct_params;
96
97
    /** @var bool Close the connection in destructor */
98
    protected $close_on_destruct = true;
99
100
    /** @var bool Maintain connection status */
101
    protected $is_connected = false;
102
103
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
104
    protected $io;
105
106
    /** @var \PhpAmqpLib\Wire\AMQPReader */
107
    protected $wait_frame_reader;
108
109
    /** @var callable Handles connection blocking from the server */
110
    private $connection_block_handler;
111
112
    /** @var callable Handles connection unblocking from the server */
113
    private $connection_unblock_handler;
114
115
    /** @var int Connection timeout value*/
116
    protected $connection_timeout ;
117
118
    /**
119
     * Circular buffer to speed up prepare_content().
120
     * Max size limited by $prepare_content_cache_max_size.
121
     *
122
     * @var array
123
     * @see prepare_content()
124
     */
125
    private $prepare_content_cache;
126
127
    /** @var int Maximal size of $prepare_content_cache */
128
    private $prepare_content_cache_max_size;
129
130
    /**
131
     * Maximum time to wait for channel operations, in seconds
132
     * @var float $channel_rpc_timeout
133
     */
134
    private $channel_rpc_timeout;
135
136
    /**
137
     * @param string $user
138
     * @param string $password
139
     * @param string $vhost
140
     * @param bool $insist
141
     * @param string $login_method
142
     * @param null $login_response
143
     * @param string $locale
144
     * @param AbstractIO $io
145
     * @param int $heartbeat
146
     * @param int $connection_timeout
147
     * @param float $channel_rpc_timeout
148
     * @throws \Exception
149
     */
150 186
    public function __construct(
151
        $user,
152
        $password,
153
        $vhost = '/',
154
        $insist = false,
155
        $login_method = 'AMQPLAIN',
156
        $login_response = null,
157
        $locale = 'en_US',
158
        AbstractIO $io,
159
        $heartbeat = 60,
160
        $connection_timeout = 0,
161
        $channel_rpc_timeout = 0.0
162
    ) {
163
        // save the params for the use of __clone
164 186
        $this->construct_params = func_get_args();
165
166 186
        $this->wait_frame_reader = new AMQPReader(null);
167 186
        $this->vhost = $vhost;
168 186
        $this->insist = $insist;
169 186
        $this->login_method = $login_method;
170 186
        $this->login_response = $login_response;
171 186
        $this->locale = $locale;
172 186
        $this->io = $io;
173 186
        $this->heartbeat = $heartbeat;
174 186
        $this->connection_timeout = $connection_timeout;
175 186
        $this->channel_rpc_timeout = $channel_rpc_timeout;
176
177 186
        if ($user && $password) {
178 186
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
179 186
            $this->login_response->write_table(array(
180 186
                'LOGIN' => array('S', $user),
181 186
                'PASSWORD' => array('S', $password)
182 93
            ));
183
184
            // Skip the length
185 186
            $responseValue = $this->login_response->getvalue();
186 186
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
187
188 93
        } else {
189
            $this->login_response = null;
190
        }
191
192 186
        $this->prepare_content_cache = array();
193 186
        $this->prepare_content_cache_max_size = 100;
194
195
        // Lazy Connection waits on connecting
196 186
        if ($this->connectOnConstruct()) {
197 162
            $this->connect();
198 78
        }
199 180
    }
200
201
    /**
202
     * Connects to the AMQP server
203
     */
204 186
    protected function connect()
205
    {
206
        try {
207
            // Loop until we connect
208 186
            while (!$this->isConnected()) {
209
                // Assume we will connect, until we dont
210 186
                $this->setIsConnected(true);
211
212
                // Connect the socket
213 186
                $this->getIO()->connect();
214
215 180
                $this->channels = array();
216
                // The connection object itself is treated as channel 0
217 180
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
218
219 180
                $this->input = new AMQPReader(null, $this->getIO());
220
221 180
                $this->write($this->amqp_protocol_header);
222 180
                $this->wait(array($this->waitHelper->get_wait('connection.start')),false,$this->connection_timeout);
223 180
                $this->x_start_ok(
224 180
                    $this->getLibraryProperties(),
225 180
                    $this->login_method,
226 180
                    $this->login_response,
227 180
                    $this->locale
228 90
                );
229
230 180
                $this->wait_tune_ok = true;
231 180
                while ($this->wait_tune_ok) {
232 180
                    $this->wait(array(
233 180
                        $this->waitHelper->get_wait('connection.secure'),
234 180
                        $this->waitHelper->get_wait('connection.tune')
235 90
                    ));
236 90
                }
237
238 180
                $host = $this->x_open($this->vhost, '', $this->insist);
239 180
                if (!$host) {
240
                    //Reconnected
241 180
                    $this->getIO()->reenableHeartbeat();
242 180
                    return null; // we weren't redirected
243
                }
244
245
                $this->setIsConnected(false);
246
                $this->closeChannels();
247
248
                // we were redirected, close the socket, loop and try again
249
                $this->close_socket();
250
            }
251
252 18
        } catch (\Exception $e) {
253
            // Something went wrong, set the connection status
254 6
            $this->setIsConnected(false);
255 6
            $this->closeChannels();
256 6
            $this->close_input();
257 6
            $this->close_socket();
258 6
            throw $e; // Rethrow exception
259
        }
260 24
    }
261
262
    /**
263
     * Reconnects using the original connection settings.
264
     * This will not recreate any channels that were established previously
265
     */
266 36
    public function reconnect()
267
    {
268
        // Try to close the AMQP connection
269 36
        $this->safeClose();
270
        // Reconnect the socket/stream then AMQP
271 36
        $this->getIO()->reconnect();
272 36
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
273 36
        $this->connect();
274 36
    }
275
276
    /**
277
     * Cloning will use the old properties to make a new connection to the same server
278
     */
279
    public function __clone()
280
    {
281
        call_user_func_array(array($this, '__construct'), $this->construct_params);
282
    }
283
284 12
    public function __destruct()
285
    {
286 12
        if ($this->close_on_destruct) {
287 12
            $this->safeClose();
288 6
        }
289 12
    }
290
291
    /**
292
     * Attempts to close the connection safely
293
     */
294 48
    protected function safeClose()
295
    {
296
        try {
297 48
            if (isset($this->input) && $this->input) {
298 33
                $this->close();
299 9
            }
300 24
        } catch (\Exception $e) {
301
            // Nothing here
302
        }
303 48
    }
304
305
    /**
306
     * @param int $sec
307
     * @param int $usec
308
     * @return mixed
309
     */
310
    public function select($sec, $usec = 0)
311
    {
312
        return $this->getIO()->select($sec, $usec);
313
    }
314
315
    /**
316
     * Allows to not close the connection
317
     * it's useful after the fork when you don't want to close parent process connection
318
     *
319
     * @param bool $close
320
     */
321
    public function set_close_on_destruct($close = true)
322
    {
323
        $this->close_on_destruct = (bool) $close;
324
    }
325
326 150
    protected function close_input()
327
    {
328 150
        $this->debug && $this->debug->debug_msg('closing input');
329
330 150
        if (!is_null($this->input)) {
331 144
            $this->input->close();
332 144
            $this->input = null;
333 72
        }
334 150
    }
335
336 150
    protected function close_socket()
337
    {
338 150
        $this->debug && $this->debug->debug_msg('closing socket');
339
340 150
        if (!is_null($this->getIO())) {
341 150
            $this->getIO()->close();
342 75
        }
343 150
    }
344
345
    /**
346
     * @param string $data
347
     */
348 180
    public function write($data)
349
    {
350 180
        $this->debug->debug_hexdump($data);
351
352
        try {
353 180
            $this->getIO()->write($data);
354 90
        } catch (AMQPConnectionClosedException $e) {
355
            $this->do_close();
356
            throw $e;
357
        } catch (AMQPRuntimeException $e) {
358
            $this->setIsConnected(false);
359
            throw $e;
360
        }
361 180
    }
362
363 144
    protected function do_close()
364
    {
365 144
        $this->setIsConnected(false);
366 144
        $this->close_input();
367 144
        $this->close_socket();
368 144
    }
369
370
    /**
371
     * @return int
372
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
373
     */
374 174
    public function get_free_channel_id()
375
    {
376 174
        for ($i = 1; $i <= $this->channel_max; $i++) {
377 174
            if (!isset($this->channels[$i])) {
378 174
                return $i;
379
            }
380 9
        }
381
382
        throw new AMQPRuntimeException('No free channel ids');
383
    }
384
385
    /**
386
     * @param string $channel
387
     * @param int $class_id
388
     * @param int $weight
389
     * @param int $body_size
390
     * @param string $packed_properties
391
     * @param string $body
392
     * @param AMQPWriter $pkt
393
     */
394 66
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
395
    {
396 66
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
397 66
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
398 66
    }
399
400
    /**
401
     * Returns a new AMQPWriter or mutates the provided $pkt
402
     *
403
     * @param string $channel
404
     * @param int $class_id
405
     * @param int $weight
406
     * @param int $body_size
407
     * @param string $packed_properties
408
     * @param string $body
409
     * @param AMQPWriter $pkt
410
     * @return AMQPWriter
411
     */
412 66
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
413
    {
414 66
        $pkt = $pkt ?: new AMQPWriter();
415
416
        // Content already prepared ?
417 66
        $key_cache = sprintf(
418 66
            '%s|%s|%s|%s',
419 55
            $channel,
420 55
            $packed_properties,
421 55
            $class_id,
422 22
            $weight
423 33
        );
424
425 66
        if (!isset($this->prepare_content_cache[$key_cache])) {
426 66
            $w = new AMQPWriter();
427 66
            $w->write_octet(2);
428 66
            $w->write_short($channel);
429 66
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
430 66
            $w->write_short($class_id);
431 66
            $w->write_short($weight);
432 66
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
433 66
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
434
                reset($this->prepare_content_cache);
435
                $old_key = key($this->prepare_content_cache);
436
                unset($this->prepare_content_cache[$old_key]);
437
            }
438 33
        }
439 66
        $pkt->write($this->prepare_content_cache[$key_cache]);
440
441 66
        $pkt->write_longlong($body_size);
442 66
        $pkt->write($packed_properties);
443
444 66
        $pkt->write_octet(0xCE);
445
446
447
        // memory efficiency: walk the string instead of biting
448
        // it. good for very large packets (close in size to
449
        // memory_limit setting)
450 66
        $position = 0;
451 66
        $bodyLength = mb_strlen($body,'ASCII');
452 66
        while ($position < $bodyLength) {
453 60
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
454 60
            $position += $this->frame_max - 8;
455
456 60
            $pkt->write_octet(3);
457 60
            $pkt->write_short($channel);
458 60
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
459
460 60
            $pkt->write($payload);
461
462 60
            $pkt->write_octet(0xCE);
463 30
        }
464
465 66
        return $pkt;
466
    }
467
468
    /**
469
     * @param string $channel
470
     * @param array $method_sig
471
     * @param AMQPWriter|string $args
472
     * @param null $pkt
473
     */
474 180
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
475
    {
476 180
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
477 180
        $this->write($pkt->getvalue());
478 180
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
479 180
    }
480
481
    /**
482
     * Returns a new AMQPWriter or mutates the provided $pkt
483
     *
484
     * @param string $channel
485
     * @param array $method_sig
486
     * @param AMQPWriter|string $args
487
     * @param AMQPWriter $pkt
488
     * @return AMQPWriter
489
     */
490 180
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
491
    {
492 180
        if ($args instanceof AMQPWriter) {
493 180
            $args = $args->getvalue();
494 90
        }
495
496 180
        $pkt = $pkt ?: new AMQPWriter();
497
498 180
        $pkt->write_octet(1);
499 180
        $pkt->write_short($channel);
500 180
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
501
        // in payload
502
503 180
        $pkt->write_short($method_sig[0]); // class_id
504 180
        $pkt->write_short($method_sig[1]); // method_id
505 180
        $pkt->write($args);
506
507 180
        $pkt->write_octet(0xCE);
508
509 180
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
510
511 180
        return $pkt;
512
    }
513
514
    /**
515
     * Waits for a frame from the server
516
     *
517
     * @param int|float|null $timeout
518
     * @return array
519
     * @throws \Exception
520
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
521
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
522
     */
523 180
    protected function wait_frame($timeout = 0)
524
    {
525 180
        if (is_null($this->input))
526 90
        {
527
            $this->setIsConnected(false);
528
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
529
        }
530
531 180
        $currentTimeout = $this->input->getTimeout();
532 180
        $this->input->setTimeout($timeout);
533
534
        try {
535
            // frame_type + channel_id + size
536 180
            $this->wait_frame_reader->reuse(
537 180
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
538 90
            );
539
540 180
            $frame_type = $this->wait_frame_reader->read_octet();
541 180
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
542 180
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
543
                throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
544
            }
545 180
            $channel = $this->wait_frame_reader->read_short();
546 180
            $size = $this->wait_frame_reader->read_long();
547
548
            // payload + ch
549 180
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
550
551 180
            $payload = $this->wait_frame_reader->read($size);
552 180
            $ch = $this->wait_frame_reader->read_octet();
553
554 129
        } catch (AMQPTimeoutException $e) {
555 30
            $this->input->setTimeout($currentTimeout);
556 30
            throw $e;
557 48
        } catch (AMQPNoDataException $e) {
558 48
            if ($this->input) {
559 48
                $this->input->setTimeout($currentTimeout);
560 24
            }
561 48
            throw $e;
562
        } catch (AMQPConnectionClosedException $exception) {
563
            $this->do_close();
564
            throw $exception;
565
        }
566
567 180
        $this->input->setTimeout($currentTimeout);
568
569 180
        if ($ch != 0xCE) {
570
            throw new AMQPInvalidFrameException(sprintf(
571
                'Framing error, unexpected byte: %x',
572
                $ch
573
            ));
574
        }
575
576 180
        return array($frame_type, $channel, $payload);
577
    }
578
579
    /**
580
     * Waits for a frame from the server destined for a particular channel.
581
     *
582
     * @param string $channel_id
583
     * @param int|float|null $timeout
584
     * @return array
585
     */
586 180
    protected function wait_channel($channel_id, $timeout = 0)
587
    {
588
        // Keeping the original timeout unchanged.
589 180
        $_timeout = $timeout;
590 180
        while (true) {
591 180
            $now = time();
592
            try {
593 180
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
594 129
            } catch (AMQPTimeoutException $e) {
595 30
                if ( $this->heartbeat && microtime(true) - ($this->heartbeat*2) > $this->last_frame ) {
596
                    $this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
597
                    $this->setIsConnected(false);
598
                    throw new AMQPHeartbeatMissedException("Missed server heartbeat");
599
                }
600
601 30
                throw $e;
602
            }
603
604 180
            $this->last_frame = microtime(true);
605
606 180
            if ($frame_channel === 0 && $frame_type === 8) {
607
                // skip heartbeat frames and reduce the timeout by the time passed
608
                $this->debug->debug_msg("received server heartbeat");
609
                if($_timeout > 0) {
610
                    $_timeout -= time() - $now;
611
                    if($_timeout <= 0) {
612
                        // If timeout has been reached, throw the exception without calling wait_frame
613
                        throw new AMQPTimeoutException("Timeout waiting on channel");
614
                    }
615
                }
616
                continue;
617
618
            } else {
619
620 180
                if ($frame_channel == $channel_id) {
621 180
                    return array($frame_type, $payload);
622
                }
623
624
                // Not the channel we were looking for.  Queue this frame
625
                //for later, when the other channel is looking for frames.
626
                // Make sure the channel still exists, it could have been
627
                // closed by a previous Exception.
628 6
                if (isset($this->channels[$frame_channel])) {
629 6
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
630 3
                }
631
632
                // If we just queued up a method for channel 0 (the Connection
633
                // itself) it's probably a close method in reaction to some
634
                // error, so deal with it right away.
635 6
                if (($frame_type == 1) && ($frame_channel == 0)) {
636
                    $this->wait();
637
                }
638
            }
639 3
        }
640
    }
641
642
    /**
643
     * Fetches a channel object identified by the numeric channel_id, or
644
     * create that object if it doesn't already exist.
645
     *
646
     * @param int $channel_id
647
     * @return AMQPChannel
648
     */
649 174
    public function channel($channel_id = null)
650
    {
651 174
        if (isset($this->channels[$channel_id])) {
652
            return $this->channels[$channel_id];
653
        }
654
655 174
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
656 174
        $ch = new AMQPChannel($this->connection, $channel_id, true, $this->channel_rpc_timeout);
657 174
        $this->channels[$channel_id] = $ch;
658
659 174
        return $ch;
660
    }
661
662
    /**
663
     * Requests a connection close
664
     *
665
     * @param int $reply_code
666
     * @param string $reply_text
667
     * @param array $method_sig
668
     * @return mixed|null
669
     */
670 144
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
671
    {
672 144
        $this->io->disableHeartbeat();
673 144
        if (empty($this->protocolWriter) || !$this->isConnected()) {
674 18
            return null;
675
        }
676
677 144
        $this->closeChannels();
678
679 144
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
680 144
            $reply_code,
681 120
            $reply_text,
682 144
            $method_sig[0],
683 144
            $method_sig[1]
684 72
        );
685 144
        $this->send_method_frame(array($class_id, $method_id), $args);
686
687 144
        $this->setIsConnected(false);
688
689 144
        return $this->wait(array(
690 144
            $this->waitHelper->get_wait('connection.close_ok')
691 144
        ),false,$this->connection_timeout);
692
    }
693
694
    /**
695
     * @param AMQPReader $reader
696
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
697
     */
698
    protected function connection_close(AMQPReader $reader)
699
    {
700
        $reply_code = $reader->read_short();
701
        $reply_text = $reader->read_shortstr();
702
        $class_id = $reader->read_short();
703
        $method_id = $reader->read_short();
704
705
        $this->x_close_ok();
706
707
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
708
    }
709
710
    /**
711
     * Confirms a connection close
712
     */
713
    protected function x_close_ok()
714
    {
715
        $this->send_method_frame(
716
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
717
        );
718
        $this->do_close();
719
    }
720
721
    /**
722
     * Confirm a connection close
723
     *
724
     * @param AMQPReader $args
725
     */
726 144
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
727
    {
728 144
        $this->do_close();
729 144
    }
730
731
    /**
732
     * @param string $virtual_host
733
     * @param string $capabilities
734
     * @param bool $insist
735
     * @return mixed
736
     */
737 180
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
738
    {
739 180
        $args = new AMQPWriter();
740 180
        $args->write_shortstr($virtual_host);
741 180
        $args->write_shortstr($capabilities);
742 180
        $args->write_bits(array($insist));
743 180
        $this->send_method_frame(array(10, 40), $args);
744
745
        $wait = array(
746 180
            $this->waitHelper->get_wait('connection.open_ok')
747 90
        );
748
749 180
        if ($this->protocolVersion == '0.8') {
750
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
751
        }
752
753 180
        return $this->wait($wait);
754
    }
755
756
    /**
757
     * Signals that the connection is ready
758
     *
759
     * @param AMQPReader $args
760
     */
761 180
    protected function connection_open_ok($args)
762
    {
763 180
        $this->known_hosts = $args->read_shortstr();
764 180
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
765 180
    }
766
767
    /**
768
     * Asks the client to use a different server
769
     *
770
     * @param AMQPReader $args
771
     * @return string
772
     */
773
    protected function connection_redirect($args)
774
    {
775
        $host = $args->read_shortstr();
776
        $this->known_hosts = $args->read_shortstr();
777
        $this->debug->debug_msg(sprintf(
778
                'Redirected to [%s], known_hosts [%s]',
779
                $host,
780
                $this->known_hosts
781
            ));
782
783
        return $host;
784
    }
785
786
    /**
787
     * Security mechanism challenge
788
     *
789
     * @param AMQPReader $args
790
     */
791
    protected function connection_secure($args)
792
    {
793
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
794
    }
795
796
    /**
797
     * Security mechanism response
798
     *
799
     * @param string $response
800
     */
801
    protected function x_secure_ok($response)
802
    {
803
        $args = new AMQPWriter();
804
        $args->write_longstr($response);
805
        $this->send_method_frame(array(10, 21), $args);
806
    }
807
808
    /**
809
     * Starts connection negotiation
810
     *
811
     * @param AMQPReader $args
812
     */
813 180
    protected function connection_start($args)
814
    {
815 180
        $this->version_major = $args->read_octet();
816 180
        $this->version_minor = $args->read_octet();
817 180
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
818 180
        $this->mechanisms = explode(' ', $args->read_longstr());
819 180
        $this->locales = explode(' ', $args->read_longstr());
820
821 180
        $this->debug->debug_connection_start(
822 180
            $this->version_major,
823 180
            $this->version_minor,
824 180
            $this->server_properties,
0 ignored issues
show
Bug introduced by
It seems like $this->server_properties can also be of type object<PhpAmqpLib\Wire\AMQPTable>; however, PhpAmqpLib\Helper\DebugH...ebug_connection_start() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
825 180
            $this->mechanisms,
826 180
            $this->locales
827 90
        );
828 180
    }
829
830
    /**
831
     * @param AMQPTable|array $clientProperties
832
     * @param string $mechanism
833
     * @param string $response
834
     * @param string $locale
835
     */
836 180
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
837
    {
838 180
        $args = new AMQPWriter();
839 180
        $args->write_table($clientProperties);
840 180
        $args->write_shortstr($mechanism);
841 180
        $args->write_longstr($response);
842 180
        $args->write_shortstr($locale);
843 180
        $this->send_method_frame(array(10, 11), $args);
844 180
    }
845
846
    /**
847
     * Proposes connection tuning parameters
848
     *
849
     * @param AMQPReader $args
850
     */
851 180
    protected function connection_tune($args)
852
    {
853 180
        $v = $args->read_short();
854 180
        if ($v) {
855
            $this->channel_max = $v;
856
        }
857
858 180
        $v = $args->read_long();
859 180
        if ($v) {
860 180
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
861 90
        }
862
863
        // use server proposed value if not set
864 180
        if ($this->heartbeat === null) {
865
            $this->heartbeat = $args->read_short();
866
        }
867
868 180
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
869 180
    }
870
871
    /**
872
     * Negotiates connection tuning parameters
873
     *
874
     * @param int $channel_max
875
     * @param int $frame_max
876
     * @param int $heartbeat
877
     */
878 180
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
879
    {
880 180
        $args = new AMQPWriter();
881 180
        $args->write_short($channel_max);
882 180
        $args->write_long($frame_max);
883 180
        $args->write_short($heartbeat);
884 180
        $this->send_method_frame(array(10, 31), $args);
885 180
        $this->wait_tune_ok = false;
886 180
    }
887
888
    /**
889
     * @return AbstractIO
890
     */
891
    public function getSocket()
892
    {
893
        return $this->io->getSocket();
894
    }
895
896
    /**
897
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
898
     */
899 162
    public function getIO()
900
    {
901 162
        return $this->io;
902
    }
903
904
    /**
905
     * Handles connection blocked notifications
906
     *
907
     * @param AMQPReader $args
908
     */
909
    protected function connection_blocked(AMQPReader $args)
910
    {
911
        // Call the block handler and pass in the reason
912
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
913
    }
914
915
    /**
916
     * Handles connection unblocked notifications
917
     *
918
     * @param AMQPReader $args
919
     */
920
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
921
    {
922
        // No args to an unblock event
923
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
924
    }
925
926
    /**
927
     * Sets a handler which is called whenever a connection.block is sent from the server
928
     *
929
     * @param callable $callback
930
     */
931
    public function set_connection_block_handler($callback)
932
    {
933
        $this->connection_block_handler = $callback;
934
    }
935
936
    /**
937
     * Sets a handler which is called whenever a connection.block is sent from the server
938
     *
939
     * @param callable $callback
940
     */
941
    public function set_connection_unblock_handler($callback)
942
    {
943
        $this->connection_unblock_handler = $callback;
944
    }
945
946
    /**
947
     * Gets the connection status
948
     *
949
     * @return bool
950
     */
951 186
    public function isConnected()
952
    {
953 186
        return (bool) $this->is_connected;
954
    }
955
956
    /**
957
     * Set the connection status
958
     *
959
     * @param bool $is_connected
960
     */
961 186
    protected function setIsConnected($is_connected)
962
    {
963 186
        $this->is_connected = (bool) $is_connected;
964 186
    }
965
966
    /**
967
     * Closes all available channels
968
     */
969 150
    protected function closeChannels()
970
    {
971 150
        foreach ($this->channels as $key => $channel) {
972
            // channels[0] is this connection object, so don't close it yet
973 144
            if ($key === 0) {
974 144
                continue;
975
            }
976
            try {
977 48
                $channel->close();
978 40
            } catch (\Exception $e) {
979
                /* Ignore closing errors */
980
            }
981 75
        }
982 150
    }
983
984
    /**
985
     * Should the connection be attempted during construction?
986
     *
987
     * @return bool
988
     */
989 162
    public function connectOnConstruct()
990
    {
991 162
        return true;
992
    }
993
994
    /**
995
     * @return array
996
     */
997
    public function getServerProperties()
998
    {
999
        return $this->server_properties;
1000
    }
1001
1002
    /**
1003
     * Get the library properties for populating the client protocol information
1004
     *
1005
     * @return array
1006
     */
1007 192
    public function getLibraryProperties()
1008
    {
1009 192
        return self::$LIBRARY_PROPERTIES;
1010
    }
1011
1012
    public static function create_connection($hosts, $options = array()){
1013
        $latest_exception = null;
1014
        for($i = 0; $i < count($hosts); $i++) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
1015
            AbstractConnection::validate_host($hosts[$i]);
1016
            $host = $hosts[$i]['host'];
1017
            $port = $hosts[$i]['port'];
1018
            $user = $hosts[$i]['user'];
1019
            $password = $hosts[$i]['password'];
1020
            $vhost = isset($hosts[$i]['vhost']) ? $hosts[$i]['vhost'] : "/";
1021
            try {
1022
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
0 ignored issues
show
Bug introduced by
The method try_create_connection() does not exist on PhpAmqpLib\Connection\AbstractConnection. Did you maybe mean connect()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
1023
                return $conn;
1024
            } catch (\Exception $e) {
1025
                $latest_exception = $e;
1026
            }
1027
        }
1028
        throw $latest_exception;
1029
    }
1030
1031
    public static function validate_host($host) {
1032
        if(!isset($host['host'])){
1033
            throw new \InvalidArgumentException("'host' key is required.");
1034
        }
1035
        if(!isset($host['port'])){
1036
            throw new \InvalidArgumentException("'port' key is required.");
1037
        }
1038
        if(!isset($host['user'])){
1039
            throw new \InvalidArgumentException("'user' key is required.");
1040
        }
1041
        if(!isset($host['password'])){
1042
            throw new \InvalidArgumentException("'password' key is required.");
1043
        }
1044
    }
1045
}
1046