Completed
Push — master ( 2a2905...08a817 )
by
unknown
11s
created

AbstractConnection::create_connection()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 18
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 18
ccs 0
cts 15
cp 0
rs 9.2
c 0
b 0
f 0
cc 4
eloc 15
nc 5
nop 2
crap 20
1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AMQPChannel;
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use PhpAmqpLib\Wire\AMQPWriter;
12
use PhpAmqpLib\Wire\IO\AbstractIO;
13
use PhpAmqpLib\Wire\IO\SocketIO;
14
use PhpAmqpLib\Wire\IO\StreamIO;
15
16
class AbstractConnection extends AbstractChannel
17
{
18
    /** @var array */
19
    public static $LIBRARY_PROPERTIES = array(
20
        'product' => array('S', 'AMQPLib'),
21
        'platform' => array('S', 'PHP'),
22
        'version' => array('S', '2.6'),
23
        'information' => array('S', ''),
24
        'copyright' => array('S', ''),
25
        'capabilities' => array(
26
            'F',
27
            array(
28
                'authentication_failure_close' => array('t', true),
29
                'publisher_confirms' => array('t', true),
30
                'consumer_cancel_notify' => array('t', true),
31
                'exchange_exchange_bindings' => array('t', true),
32
                'basic.nack' => array('t', true),
33
                'connection.blocked' => array('t', true)
34
            )
35
        )
36
    );
37
38
    /** @var AMQPChannel[] */
39
    public $channels = array();
40
41
    /** @var int */
42
    protected $version_major;
43
44
    /** @var int */
45
    protected $version_minor;
46
47
    /** @var array */
48
    protected $server_properties;
49
50
    /** @var array */
51
    protected $mechanisms;
52
53
    /** @var array */
54
    protected $locales;
55
56
    /** @var bool */
57
    protected $wait_tune_ok;
58
59
    /** @var string */
60
    protected $known_hosts;
61
62
    /** @var AMQPReader */
63
    protected $input;
64
65
    /** @var string */
66
    protected $vhost;
67
68
    /** @var bool */
69
    protected $insist;
70
71
    /** @var string */
72
    protected $login_method;
73
74
    /** @var string */
75
    protected $login_response;
76
77
    /** @var string */
78
    protected $locale;
79
80
    /** @var int */
81
    protected $heartbeat;
82
83
    /** @var float */
84
    protected $last_frame;
85
86
    /** @var SocketIO */
87
    protected $sock;
88
89
    /** @var int */
90
    protected $channel_max = 65535;
91
92
    /** @var int */
93
    protected $frame_max = 131072;
94
95
     /** @var array Constructor parameters for clone */
96
    protected $construct_params;
97
98
    /** @var bool Close the connection in destructor */
99
    protected $close_on_destruct = true;
100
101
    /** @var bool Maintain connection status */
102
    protected $is_connected = false;
103
104
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
105
    protected $io;
106
107
    /** @var \PhpAmqpLib\Wire\AMQPReader */
108
    protected $wait_frame_reader;
109
110
    /** @var callable Handles connection blocking from the server */
111
    private $connection_block_handler;
112
113
    /** @var callable Handles connection unblocking from the server */
114
    private $connection_unblock_handler;
115
116
    /** @var int Connection timeout value*/
117
    protected $connection_timeout ;
118
119
    /**
120
     * Circular buffer to speed up prepare_content().
121
     * Max size limited by $prepare_content_cache_max_size.
122
     *
123
     * @var array
124
     * @see prepare_content()
125
     */
126
    private $prepare_content_cache;
127
128
    /** @var int Maximal size of $prepare_content_cache */
129
    private $prepare_content_cache_max_size;
130
131
    /**
132
     * @param string $user
133
     * @param string $password
134
     * @param string $vhost
135
     * @param bool $insist
136
     * @param string $login_method
137
     * @param null $login_response
138
     * @param string $locale
139
     * @param AbstractIO $io
140
     * @param int $heartbeat
141
     * @param int $connection_timeout
142
     * @throws \Exception
143
     */
144 90
    public function __construct(
145
        $user,
146
        $password,
147
        $vhost = '/',
148
        $insist = false,
149
        $login_method = 'AMQPLAIN',
150
        $login_response = null,
151
        $locale = 'en_US',
152
        AbstractIO $io,
153
        $heartbeat = 0,
154
        $connection_timeout = 0
155
    ) {
156
        // save the params for the use of __clone
157 90
        $this->construct_params = func_get_args();
158
159 90
        $this->wait_frame_reader = new AMQPReader(null);
160 90
        $this->vhost = $vhost;
161 90
        $this->insist = $insist;
162 90
        $this->login_method = $login_method;
163 90
        $this->login_response = $login_response;
164 90
        $this->locale = $locale;
165 90
        $this->io = $io;
166 90
        $this->heartbeat = $heartbeat;
167 90
        $this->connection_timeout = $connection_timeout;
168
169 90
        if ($user && $password) {
170 90
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
171 90
            $this->login_response->write_table(array(
172 90
                'LOGIN' => array('S', $user),
173 90
                'PASSWORD' => array('S', $password)
174 60
            ));
175
176
            // Skip the length
177 90
            $responseValue = $this->login_response->getvalue();
178 90
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
179
180 60
        } else {
181
            $this->login_response = null;
182
        }
183
184 90
        $this->prepare_content_cache = array();
185 90
        $this->prepare_content_cache_max_size = 100;
186
187
        // Lazy Connection waits on connecting
188 90
        if ($this->connectOnConstruct()) {
189 66
            $this->connect();
190 40
        }
191 84
    }
192
193
    /**
194
     * Connects to the AMQP server
195
     */
196 90
    protected function connect()
197
    {
198
        try {
199
            // Loop until we connect
200 90
            while (!$this->isConnected()) {
201
                // Assume we will connect, until we dont
202 90
                $this->setIsConnected(true);
203
204
                // Connect the socket
205 90
                $this->getIO()->connect();
206
207 84
                $this->channels = array();
208
                // The connection object itself is treated as channel 0
209 84
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
210
211 84
                $this->input = new AMQPReader(null, $this->getIO());
212
213 84
                $this->write($this->amqp_protocol_header);
214 84
                $this->wait(array($this->waitHelper->get_wait('connection.start')),false,$this->connection_timeout);
215 84
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
216
217 84
                $this->wait_tune_ok = true;
218 84
                while ($this->wait_tune_ok) {
219 84
                    $this->wait(array(
220 84
                        $this->waitHelper->get_wait('connection.secure'),
221 84
                        $this->waitHelper->get_wait('connection.tune')
222 56
                    ));
223 56
                }
224
225 84
                $host = $this->x_open($this->vhost, '', $this->insist);
226 84
                if (!$host) {
227
                    //Reconnected
228 84
                    if ($this->io instanceof StreamIO)
229 56
                    {
230 36
                        $this->getIO()->reenableHeartbeat();
0 ignored issues
show
Bug introduced by
It seems like you code against a specific sub-type and not the parent class PhpAmqpLib\Wire\IO\AbstractIO as the method reenableHeartbeat() does only exist in the following sub-classes of PhpAmqpLib\Wire\IO\AbstractIO: PhpAmqpLib\Wire\IO\StreamIO. Maybe you want to instanceof check for one of these explicitly?

Let’s take a look at an example:

abstract class User
{
    /** @return string */
    abstract public function getPassword();
}

class MyUser extends User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different sub-classes of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the parent class:

    abstract class User
    {
        /** @return string */
        abstract public function getPassword();
    
        /** @return string */
        abstract public function getDisplayName();
    }
    
Loading history...
231 24
                    }
232 84
                    return null; // we weren't redirected
233
                }
234
235
                $this->setIsConnected(false);
236
                $this->closeChannels();
237
238
                // we were redirected, close the socket, loop and try again
239
                $this->close_socket();
240
            }
241
242 22
        } catch (\Exception $e) {
243
            // Something went wrong, set the connection status
244 6
            $this->setIsConnected(false);
245 6
            $this->closeChannels();
246 6
            throw $e; // Rethrow exception
247
        }
248 24
    }
249
250
    /**
251
     * Reconnects using the original connection settings.
252
     * This will not recreate any channels that were established previously
253
     */
254 36
    public function reconnect()
255
    {
256
        // Try to close the AMQP connection
257 36
        $this->safeClose();
258
        // Reconnect the socket/stream then AMQP
259 36
        $this->getIO()->reconnect();
260 36
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
261 36
        $this->connect();
262 36
    }
263
264
    /**
265
     * Cloning will use the old properties to make a new connection to the same server
266
     */
267
    public function __clone()
268
    {
269
        call_user_func_array(array($this, '__construct'), $this->construct_params);
270
    }
271
272
    public function __destruct()
273
    {
274
        if ($this->close_on_destruct) {
275
            $this->safeClose();
276
        }
277
    }
278
279
    /**
280
     * Attempts to close the connection safely
281
     */
282 36
    protected function safeClose()
283
    {
284
        try {
285 36
            if (isset($this->input) && $this->input) {
286 24
                $this->close();
287 12
            }
288 24
        } catch (\Exception $e) {
289
            // Nothing here
290
        }
291 36
    }
292
293
    /**
294
     * @param int $sec
295
     * @param int $usec
296
     * @return mixed
297
     */
298
    public function select($sec, $usec = 0)
299
    {
300
        return $this->getIO()->select($sec, $usec);
301
    }
302
303
    /**
304
     * Allows to not close the connection
305
     * it's useful after the fork when you don't want to close parent process connection
306
     *
307
     * @param bool $close
308
     */
309
    public function set_close_on_destruct($close = true)
310
    {
311
        $this->close_on_destruct = (bool) $close;
312
    }
313
314 84
    protected function close_input()
315
    {
316 84
        $this->debug->debug_msg('closing input');
317
318 84
        if (!is_null($this->input)) {
319 84
            $this->input->close();
320 84
            $this->input = null;
321 56
        }
322 84
    }
323
324 84
    protected function close_socket()
325
    {
326 84
        $this->debug->debug_msg('closing socket');
327
328 84
        if (!is_null($this->getIO())) {
329 84
            $this->getIO()->close();
330 56
        }
331 84
    }
332
333
    /**
334
     * @param $data
335
     */
336 84
    public function write($data)
337
    {
338 84
        $this->debug->debug_hexdump($data);
339
340
        try {
341 84
            $this->getIO()->write($data);
342 56
        } catch (AMQPRuntimeException $e) {
343
            $this->setIsConnected(false);
344
            throw $e;
345
        }
346 84
    }
347
348 84
    protected function do_close()
349
    {
350 84
        $this->setIsConnected(false);
351 84
        $this->close_input();
352 84
        $this->close_socket();
353 84
    }
354
355
    /**
356
     * @return int
357
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
358
     */
359 84
    public function get_free_channel_id()
360
    {
361 84
        for ($i = 1; $i <= $this->channel_max; $i++) {
362 84
            if (!isset($this->channels[$i])) {
363 84
                return $i;
364
            }
365 12
        }
366
367
        throw new AMQPRuntimeException('No free channel ids');
368
    }
369
370
    /**
371
     * @param string $channel
372
     * @param int $class_id
373
     * @param int $weight
374
     * @param int $body_size
375
     * @param string $packed_properties
376
     * @param string $body
377
     * @param AMQPWriter $pkt
378
     */
379 72
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
380
    {
381 72
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
382 72
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
383 72
    }
384
385
    /**
386
     * Returns a new AMQPWriter or mutates the provided $pkt
387
     *
388
     * @param string $channel
389
     * @param int $class_id
390
     * @param int $weight
391
     * @param int $body_size
392
     * @param string $packed_properties
393
     * @param string $body
394
     * @param AMQPWriter $pkt
395
     * @return AMQPWriter
396
     */
397 72
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
398
    {
399 72
        $pkt = $pkt ?: new AMQPWriter();
400
401
        // Content already prepared ?
402 72
        $key_cache = sprintf(
403 72
            '%s|%s|%s|%s',
404 72
            $channel,
405 72
            $packed_properties,
406 72
            $class_id,
407 24
            $weight
408 48
        );
409
410 72
        if (!isset($this->prepare_content_cache[$key_cache])) {
411 72
            $w = new AMQPWriter();
412 72
            $w->write_octet(2);
413 72
            $w->write_short($channel);
414 72
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
415 72
            $w->write_short($class_id);
416 72
            $w->write_short($weight);
417 72
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
418 72
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
419
                reset($this->prepare_content_cache);
420
                $old_key = key($this->prepare_content_cache);
421
                unset($this->prepare_content_cache[$old_key]);
422
            }
423 48
        }
424 72
        $pkt->write($this->prepare_content_cache[$key_cache]);
425
426 72
        $pkt->write_longlong($body_size);
427 72
        $pkt->write($packed_properties);
428
429 72
        $pkt->write_octet(0xCE);
430
431
432
        // memory efficiency: walk the string instead of biting
433
        // it. good for very large packets (close in size to
434
        // memory_limit setting)
435 72
        $position = 0;
436 72
        $bodyLength = mb_strlen($body,'ASCII');
437 72
        while ($position < $bodyLength) {
438 66
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
439 66
            $position += $this->frame_max - 8;
440
441 66
            $pkt->write_octet(3);
442 66
            $pkt->write_short($channel);
443 66
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
444
445 66
            $pkt->write($payload);
446
447 66
            $pkt->write_octet(0xCE);
448 44
        }
449
450 72
        return $pkt;
451
    }
452
453
    /**
454
     * @param string $channel
455
     * @param array $method_sig
456
     * @param AMQPWriter|string $args
457
     * @param null $pkt
458
     */
459 84
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
460
    {
461 84
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
462 84
        $this->write($pkt->getvalue());
463 84
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
464 84
    }
465
466
    /**
467
     * Returns a new AMQPWriter or mutates the provided $pkt
468
     *
469
     * @param string $channel
470
     * @param array $method_sig
471
     * @param AMQPWriter|string $args
472
     * @param AMQPWriter $pkt
473
     * @return AMQPWriter
474
     */
475 84
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
476
    {
477 84
        if ($args instanceof AMQPWriter) {
478 84
            $args = $args->getvalue();
479 56
        }
480
481 84
        $pkt = $pkt ?: new AMQPWriter();
482
483 84
        $pkt->write_octet(1);
484 84
        $pkt->write_short($channel);
485 84
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
486
        // in payload
487
488 84
        $pkt->write_short($method_sig[0]); // class_id
489 84
        $pkt->write_short($method_sig[1]); // method_id
490 84
        $pkt->write($args);
491
492 84
        $pkt->write_octet(0xCE);
493
494 84
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
495
496 84
        return $pkt;
497
    }
498
499
    /**
500
     * Waits for a frame from the server
501
     *
502
     * @param int $timeout
503
     * @return array
504
     * @throws \Exception
505
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
506
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
507
     */
508 84
    protected function wait_frame($timeout = 0)
509
    {
510 84
        if (is_null($this->input))
511 56
        {
512
            $this->setIsConnected(false);
513
            throw new AMQPRuntimeException('Broken pipe or closed connection');
514
        }
515
516 84
        $currentTimeout = $this->input->getTimeout();
517 84
        $this->input->setTimeout($timeout);
518
519
        try {
520
            // frame_type + channel_id + size
521 84
            $this->wait_frame_reader->reuse(
522 84
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
523 56
            );
524
525 84
            $frame_type = $this->wait_frame_reader->read_octet();
526 84
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
527 84
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
528
                throw new AMQPRuntimeException('Invalid frame type ' . $frame_type);
529
            }
530 84
            $channel = $this->wait_frame_reader->read_short();
531 84
            $size = $this->wait_frame_reader->read_long();
532
533
            // payload + ch
534 84
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
535
536 84
            $payload = $this->wait_frame_reader->read($size);
537 84
            $ch = $this->wait_frame_reader->read_octet();
538
539 56
        } catch (AMQPTimeoutException $e) {
540
            $this->input->setTimeout($currentTimeout);
541
            throw $e;
542
        }
543
544 84
        $this->input->setTimeout($currentTimeout);
545
546 84
        if ($ch != 0xCE) {
547
            throw new AMQPRuntimeException(sprintf(
548
                'Framing error, unexpected byte: %x',
549
                $ch
550
            ));
551
        }
552
553 84
        return array($frame_type, $channel, $payload);
554
    }
555
556
    /**
557
     * Waits for a frame from the server destined for a particular channel.
558
     *
559
     * @param string $channel_id
560
     * @param int $timeout
561
     * @return array
562
     */
563 84
    protected function wait_channel($channel_id, $timeout = 0)
564
    {
565
        // Keeping the original timeout unchanged.
566 84
        $_timeout = $timeout;
567 84
        while (true) {
568 84
            $now = time();
569
            try {
570 84
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
571
            }
572 56
            catch ( AMQPTimeoutException $e ) {
573
                if ( $this->heartbeat && microtime(true) - ($this->heartbeat*2) > $this->last_frame ) {
574
                    $this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
575
                    $this->setIsConnected(false);
576
                    throw new AMQPRuntimeException("Missed server heartbeat");
577
                }
578
579
                throw $e;
580
            }
581
582 84
            $this->last_frame = microtime(true);
583
584 84
            if ($frame_channel === 0 && $frame_type === 8) {
585
                // skip heartbeat frames and reduce the timeout by the time passed
586
                $this->debug->debug_msg("received server heartbeat");
587
                if($_timeout > 0) {
588
                    $_timeout -= time() - $now;
589
                    if($_timeout <= 0) {
590
                        // If timeout has been reached, throw the exception without calling wait_frame
591
                        throw new AMQPTimeoutException("Timeout waiting on channel");
592
                    }
593
                }
594
                continue;
595
596
            } else {
597
598 84
                if ($frame_channel == $channel_id) {
599 84
                    return array($frame_type, $payload);
600
                }
601
602
                // Not the channel we were looking for.  Queue this frame
603
                //for later, when the other channel is looking for frames.
604
                // Make sure the channel still exists, it could have been
605
                // closed by a previous Exception.
606 6
                if (isset($this->channels[$frame_channel])) {
607 6
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
608 4
                }
609
610
                // If we just queued up a method for channel 0 (the Connection
611
                // itself) it's probably a close method in reaction to some
612
                // error, so deal with it right away.
613 6
                if (($frame_type == 1) && ($frame_channel == 0)) {
614
                    $this->wait();
615
                }
616
            }
617 4
        }
618
    }
619
620
    /**
621
     * Fetches a channel object identified by the numeric channel_id, or
622
     * create that object if it doesn't already exist.
623
     *
624
     * @param int $channel_id
625
     * @return AMQPChannel
626
     */
627 84
    public function channel($channel_id = null)
628
    {
629 84
        if (isset($this->channels[$channel_id])) {
630
            return $this->channels[$channel_id];
631
        }
632
633 84
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
634 84
        $ch = new AMQPChannel($this->connection, $channel_id);
635 84
        $this->channels[$channel_id] = $ch;
636
637 84
        return $ch;
638
    }
639
640
    /**
641
     * Requests a connection close
642
     *
643
     * @param int $reply_code
644
     * @param string $reply_text
645
     * @param array $method_sig
646
     * @return mixed|null
647
     */
648 84
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
649
    {
650 84
        if ($this->io instanceof StreamIO)
651 56
        {
652 36
            $this->io->disableHeartbeat();
653 24
        }
654
655 84
        if (empty($this->protocolWriter) || !$this->isConnected()) {
656 6
            return null;
657
        }
658
659 84
        $this->closeChannels();
660
661 84
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
662 84
            $reply_code,
663 84
            $reply_text,
664 84
            $method_sig[0],
665 84
            $method_sig[1]
666 56
        );
667 84
        $this->send_method_frame(array($class_id, $method_id), $args);
668
669 84
        $this->setIsConnected(false);
670
671 84
        return $this->wait(array(
672 84
            $this->waitHelper->get_wait('connection.close_ok')
673 84
        ),false,$this->connection_timeout);
674
    }
675
676
    /**
677
     * @param AMQPReader $reader
678
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
679
     */
680
    protected function connection_close(AMQPReader $reader)
681
    {
682
        $reply_code = $reader->read_short();
683
        $reply_text = $reader->read_shortstr();
684
        $class_id = $reader->read_short();
685
        $method_id = $reader->read_short();
686
687
        $this->x_close_ok();
688
689
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
690
    }
691
692
    /**
693
     * Confirms a connection close
694
     */
695
    protected function x_close_ok()
696
    {
697
        $this->send_method_frame(
698
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
699
        );
700
        $this->do_close();
701
    }
702
703
    /**
704
     * Confirm a connection close
705
     *
706
     * @param AMQPReader $args
707
     */
708 84
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
709
    {
710 84
        $this->do_close();
711 84
    }
712
713
    /**
714
     * @param string $virtual_host
715
     * @param string $capabilities
716
     * @param bool $insist
717
     * @return mixed
718
     */
719 84
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
720
    {
721 84
        $args = new AMQPWriter();
722 84
        $args->write_shortstr($virtual_host);
723 84
        $args->write_shortstr($capabilities);
724 84
        $args->write_bits(array($insist));
725 84
        $this->send_method_frame(array(10, 40), $args);
726
727
        $wait = array(
728 84
            $this->waitHelper->get_wait('connection.open_ok')
729 56
        );
730
731 84
        if ($this->protocolVersion == '0.8') {
732
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
733
        }
734
735 84
        return $this->wait($wait);
736
    }
737
738
    /**
739
     * Signals that the connection is ready
740
     *
741
     * @param AMQPReader $args
742
     */
743 84
    protected function connection_open_ok($args)
744
    {
745 84
        $this->known_hosts = $args->read_shortstr();
746 84
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
747 84
    }
748
749
    /**
750
     * Asks the client to use a different server
751
     *
752
     * @param AMQPReader $args
753
     * @return string
754
     */
755
    protected function connection_redirect($args)
756
    {
757
        $host = $args->read_shortstr();
758
        $this->known_hosts = $args->read_shortstr();
759
        $this->debug->debug_msg(sprintf(
760
                'Redirected to [%s], known_hosts [%s]',
761
                $host,
762
                $this->known_hosts
763
            ));
764
765
        return $host;
766
    }
767
768
    /**
769
     * Security mechanism challenge
770
     *
771
     * @param AMQPReader $args
772
     */
773
    protected function connection_secure($args)
774
    {
775
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
776
    }
777
778
    /**
779
     * Security mechanism response
780
     *
781
     * @param string $response
782
     */
783
    protected function x_secure_ok($response)
784
    {
785
        $args = new AMQPWriter();
786
        $args->write_longstr($response);
787
        $this->send_method_frame(array(10, 21), $args);
788
    }
789
790
    /**
791
     * Starts connection negotiation
792
     *
793
     * @param AMQPReader $args
794
     */
795 84
    protected function connection_start($args)
796
    {
797 84
        $this->version_major = $args->read_octet();
798 84
        $this->version_minor = $args->read_octet();
799 84
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
800 84
        $this->mechanisms = explode(' ', $args->read_longstr());
801 84
        $this->locales = explode(' ', $args->read_longstr());
802
803 84
        $this->debug->debug_connection_start(
804 84
            $this->version_major,
805 84
            $this->version_minor,
806 84
            $this->server_properties,
0 ignored issues
show
Bug introduced by
It seems like $this->server_properties can also be of type object<PhpAmqpLib\Wire\AMQPTable>; however, PhpAmqpLib\Helper\DebugH...ebug_connection_start() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
807 84
            $this->mechanisms,
808 84
            $this->locales
809 56
        );
810 84
    }
811
812
    /**
813
     * @param AMQPTable|array $clientProperties
814
     * @param string $mechanism
815
     * @param string $response
816
     * @param string $locale
817
     */
818 84
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
819
    {
820 84
        $args = new AMQPWriter();
821 84
        $args->write_table($clientProperties);
822 84
        $args->write_shortstr($mechanism);
823 84
        $args->write_longstr($response);
824 84
        $args->write_shortstr($locale);
825 84
        $this->send_method_frame(array(10, 11), $args);
826 84
    }
827
828
    /**
829
     * Proposes connection tuning parameters
830
     *
831
     * @param AMQPReader $args
832
     */
833 84
    protected function connection_tune($args)
834
    {
835 84
        $v = $args->read_short();
836 84
        if ($v) {
837
            $this->channel_max = $v;
838
        }
839
840 84
        $v = $args->read_long();
841 84
        if ($v) {
842 84
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
843 56
        }
844
845
        // use server proposed value if not set
846 84
        if ($this->heartbeat === null) {
847
            $this->heartbeat = $args->read_short();
848
        }
849
850 84
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
851 84
    }
852
853
    /**
854
     * Negotiates connection tuning parameters
855
     *
856
     * @param int $channel_max
857
     * @param int $frame_max
858
     * @param int $heartbeat
859
     */
860 84
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
861
    {
862 84
        $args = new AMQPWriter();
863 84
        $args->write_short($channel_max);
864 84
        $args->write_long($frame_max);
865 84
        $args->write_short($heartbeat);
866 84
        $this->send_method_frame(array(10, 31), $args);
867 84
        $this->wait_tune_ok = false;
868 84
    }
869
870
    /**
871
     * @return SocketIO
872
     */
873
    public function getSocket()
874
    {
875
        return $this->io->getSocket();
876
    }
877
878
    /**
879
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
880
     */
881 66
    public function getIO()
882
    {
883 66
        return $this->io;
884
    }
885
886
    /**
887
     * Handles connection blocked notifications
888
     *
889
     * @param AMQPReader $args
890
     */
891
    protected function connection_blocked(AMQPReader $args)
892
    {
893
        // Call the block handler and pass in the reason
894
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
895
    }
896
897
    /**
898
     * Handles connection unblocked notifications
899
     *
900
     * @param AMQPReader $args
901
     */
902
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
903
    {
904
        // No args to an unblock event
905
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
906
    }
907
908
    /**
909
     * Sets a handler which is called whenever a connection.block is sent from the server
910
     *
911
     * @param callable $callback
912
     */
913
    public function set_connection_block_handler($callback)
914
    {
915
        $this->connection_block_handler = $callback;
916
    }
917
918
    /**
919
     * Sets a handler which is called whenever a connection.block is sent from the server
920
     *
921
     * @param callable $callback
922
     */
923
    public function set_connection_unblock_handler($callback)
924
    {
925
        $this->connection_unblock_handler = $callback;
926
    }
927
928
    /**
929
     * Gets the connection status
930
     *
931
     * @return bool
932
     */
933 90
    public function isConnected()
934
    {
935 90
        return (bool) $this->is_connected;
936
    }
937
938
    /**
939
     * Set the connection status
940
     *
941
     * @param bool $is_connected
942
     */
943 90
    protected function setIsConnected($is_connected)
944
    {
945 90
        $this->is_connected = (bool) $is_connected;
946 90
    }
947
948
    /**
949
     * Closes all available channels
950
     */
951 90
    protected function closeChannels()
952
    {
953 90
        foreach ($this->channels as $key => $channel) {
954
            // channels[0] is this connection object, so don't close it yet
955 84
            if ($key === 0) {
956 84
                continue;
957
            }
958
            try {
959 36
                $channel->close();
960 36
            } catch (\Exception $e) {
961
                /* Ignore closing errors */
962
            }
963 60
        }
964 90
    }
965
966
    /**
967
     * Should the connection be attempted during construction?
968
     *
969
     * @return bool
970
     */
971 66
    public function connectOnConstruct()
972
    {
973 66
        return true;
974
    }
975
976
    /**
977
     * @return array
978
     */
979
    public function getServerProperties()
980
    {
981
        return $this->server_properties;
982
    }
983
984
    public static function create_connection($hosts, $options = array()){
985
        $latest_exception = null;
986
        for($i = 0; $i < count($hosts); $i++) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
987
            AbstractConnection::validate_host($hosts[$i]);
988
            $host = $hosts[$i]['host'];
989
            $port = $hosts[$i]['port'];
990
            $user = $hosts[$i]['user'];
991
            $password = $hosts[$i]['password'];
992
            $vhost = isset($hosts[$i]['vhost']) ? $hosts[$i]['vhost'] : "/";
993
            try {
994
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
0 ignored issues
show
Bug introduced by
The method try_create_connection() does not exist on PhpAmqpLib\Connection\AbstractConnection. Did you maybe mean connect()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
995
                return $conn;
996
            } catch (\Exception $e) {
997
                $latest_exception = $e;
998
            }
999
        }
1000
        throw $latest_exception;
1001
    }
1002
1003
    public static function validate_host($host) {
1004
        if(!isset($host['host'])){
1005
            throw new \InvalidArgumentException("'host' key is required.");
1006
        }
1007
        if(!isset($host['port'])){
1008
            throw new \InvalidArgumentException("'port' key is required.");
1009
        }
1010
        if(!isset($host['user'])){
1011
            throw new \InvalidArgumentException("'user' key is required.");
1012
        }
1013
        if(!isset($host['password'])){
1014
            throw new \InvalidArgumentException("'password' key is required.");
1015
        }
1016
    }
1017
}
1018