Completed
Push — master ( 8c6650...66cd22 )
by
unknown
29:09 queued 09:12
created

AbstractConnection::wait_channel()   C

Complexity

Conditions 13
Paths 11

Size

Total Lines 56
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 21.5881

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 56
ccs 17
cts 27
cp 0.6296
rs 6.6843
cc 13
eloc 27
nc 11
nop 2
crap 21.5881

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AMQPChannel;
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use PhpAmqpLib\Wire\AMQPWriter;
12
use PhpAmqpLib\Wire\IO\AbstractIO;
13
use PhpAmqpLib\Wire\IO\SocketIO;
14
use PhpAmqpLib\Wire\IO\StreamIO;
15
16
class AbstractConnection extends AbstractChannel
17
{
18
    /** @var array */
19
    public static $LIBRARY_PROPERTIES = array(
20
        'product' => array('S', 'AMQPLib'),
21
        'platform' => array('S', 'PHP'),
22
        'version' => array('S', '2.6'),
23
        'information' => array('S', ''),
24
        'copyright' => array('S', ''),
25
        'capabilities' => array(
26
            'F',
27
            array(
28
                'authentication_failure_close' => array('t', true),
29
                'publisher_confirms' => array('t', true),
30
                'consumer_cancel_notify' => array('t', true),
31
                'exchange_exchange_bindings' => array('t', true),
32
                'basic.nack' => array('t', true),
33
                'connection.blocked' => array('t', true)
34
            )
35
        )
36
    );
37
38
    /** @var AMQPChannel[] */
39
    public $channels = array();
40
41
    /** @var int */
42
    protected $version_major;
43
44
    /** @var int */
45
    protected $version_minor;
46
47
    /** @var array */
48
    protected $server_properties;
49
50
    /** @var array */
51
    protected $mechanisms;
52
53
    /** @var array */
54
    protected $locales;
55
56
    /** @var bool */
57
    protected $wait_tune_ok;
58
59
    /** @var string */
60
    protected $known_hosts;
61
62
    /** @var AMQPReader */
63
    protected $input;
64
65
    /** @var string */
66
    protected $vhost;
67
68
    /** @var bool */
69
    protected $insist;
70
71
    /** @var string */
72
    protected $login_method;
73
74
    /** @var string */
75
    protected $login_response;
76
77
    /** @var string */
78
    protected $locale;
79
80
    /** @var int */
81
    protected $heartbeat;
82
83
    /** @var float */
84
    protected $last_frame;
85
86
    /** @var SocketIO */
87
    protected $sock;
88
89
    /** @var int */
90
    protected $channel_max = 65535;
91
92
    /** @var int */
93
    protected $frame_max = 131072;
94
95
     /** @var array Constructor parameters for clone */
96
    protected $construct_params;
97
98
    /** @var bool Close the connection in destructor */
99
    protected $close_on_destruct = true;
100
101
    /** @var bool Maintain connection status */
102
    protected $is_connected = false;
103
104
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
105
    protected $io;
106
107
    /** @var \PhpAmqpLib\Wire\AMQPReader */
108
    protected $wait_frame_reader;
109
110
    /** @var callable Handles connection blocking from the server */
111
    private $connection_block_handler;
112
113
    /** @var callable Handles connection unblocking from the server */
114
    private $connection_unblock_handler;
115
116
    /** @var int Connection timeout value*/
117
    protected $connection_timeout ;
118
119
    /**
120
     * Circular buffer to speed up prepare_content().
121
     * Max size limited by $prepare_content_cache_max_size.
122
     *
123
     * @var array
124
     * @see prepare_content()
125
     */
126
    private $prepare_content_cache;
127
128
    /** @var int Maximal size of $prepare_content_cache */
129
    private $prepare_content_cache_max_size;
130
131
    /**
132
     * @param string $user
133
     * @param string $password
134
     * @param string $vhost
135
     * @param bool $insist
136
     * @param string $login_method
137
     * @param null $login_response
138
     * @param string $locale
139
     * @param AbstractIO $io
140
     * @param int $heartbeat
141 90
     * @param int $connection_timeout
142
     * @throws \Exception
143
     */
144
    public function __construct(
145
        $user,
146
        $password,
147
        $vhost = '/',
148
        $insist = false,
149
        $login_method = 'AMQPLAIN',
150
        $login_response = null,
151
        $locale = 'en_US',
152
        AbstractIO $io,
153
        $heartbeat = 0,
154 90
        $connection_timeout = 0
155
    ) {
156 90
        // save the params for the use of __clone
157 90
        $this->construct_params = func_get_args();
158 90
159 90
        $this->wait_frame_reader = new AMQPReader(null);
160 90
        $this->vhost = $vhost;
161 90
        $this->insist = $insist;
162 90
        $this->login_method = $login_method;
163 90
        $this->login_response = $login_response;
164 90
        $this->locale = $locale;
165
        $this->io = $io;
166 90
        $this->heartbeat = $heartbeat;
167 90
        $this->connection_timeout = $connection_timeout;
168 90
169 90
        if ($user && $password) {
170 90
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
171 45
            $this->login_response->write_table(array(
172
                'LOGIN' => array('S', $user),
173
                'PASSWORD' => array('S', $password)
174 90
            ));
175 90
176
            // Skip the length
177 45
            $responseValue = $this->login_response->getvalue();
178
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
179
180
        } else {
181 90
            $this->login_response = null;
182 90
        }
183
184
        $this->prepare_content_cache = array();
185 90
        $this->prepare_content_cache_max_size = 100;
186 66
187 30
        // Lazy Connection waits on connecting
188 84
        if ($this->connectOnConstruct()) {
189
            $this->connect();
190
        }
191
    }
192
193 90
    /**
194
     * Connects to the AMQP server
195
     */
196
    protected function connect()
197 90
    {
198
        try {
199 90
            // Loop until we connect
200
            while (!$this->isConnected()) {
201
                // Assume we will connect, until we dont
202 90
                $this->setIsConnected(true);
203
204 84
                // Connect the socket
205
                $this->getIO()->connect();
206 84
207
                $this->channels = array();
208 84
                // The connection object itself is treated as channel 0
209
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
210 84
211 84
                $this->input = new AMQPReader(null, $this->getIO());
212 84
213
                $this->write($this->amqp_protocol_header);
214 84
                $this->wait(array($this->waitHelper->get_wait('connection.start')),false,$this->connection_timeout);
215 84
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
216 84
217 84
                $this->wait_tune_ok = true;
218 84
                while ($this->wait_tune_ok) {
219 42
                    $this->wait(array(
220 42
                        $this->waitHelper->get_wait('connection.secure'),
221
                        $this->waitHelper->get_wait('connection.tune')
222 84
                    ));
223 84
                }
224
225 84
                $host = $this->x_open($this->vhost, '', $this->insist);
226 42
                if (!$host) {
227 36
                    //Reconnected
228 18
                    if ($this->io instanceof StreamIO)
229 84
                    {
230
                        $this->getIO()->reenableHeartbeat();
0 ignored issues
show
Bug introduced by
It seems like you code against a specific sub-type and not the parent class PhpAmqpLib\Wire\IO\AbstractIO as the method reenableHeartbeat() does only exist in the following sub-classes of PhpAmqpLib\Wire\IO\AbstractIO: PhpAmqpLib\Wire\IO\StreamIO. Maybe you want to instanceof check for one of these explicitly?

Let’s take a look at an example:

abstract class User
{
    /** @return string */
    abstract public function getPassword();
}

class MyUser extends User
{
    public function getPassword()
    {
        // return something
    }

    public function getDisplayName()
    {
        // return some name.
    }
}

class AuthSystem
{
    public function authenticate(User $user)
    {
        $this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
        // do something.
    }
}

In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different sub-classes of User which does not have a getDisplayName() method, the code will break.

Available Fixes

  1. Change the type-hint for the parameter:

    class AuthSystem
    {
        public function authenticate(MyUser $user) { /* ... */ }
    }
    
  2. Add an additional type-check:

    class AuthSystem
    {
        public function authenticate(User $user)
        {
            if ($user instanceof MyUser) {
                $this->logger->info(/** ... */);
            }
    
            // or alternatively
            if ( ! $user instanceof MyUser) {
                throw new \LogicException(
                    '$user must be an instance of MyUser, '
                   .'other instances are not supported.'
                );
            }
    
        }
    }
    
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types inside the if block in such a case.
  1. Add the method to the parent class:

    abstract class User
    {
        /** @return string */
        abstract public function getPassword();
    
        /** @return string */
        abstract public function getDisplayName();
    }
    
Loading history...
231
                    }
232
                    return null; // we weren't redirected
233
                }
234
235
                $this->setIsConnected(false);
236
                $this->closeChannels();
237
238
                // we were redirected, close the socket, loop and try again
239 18
                $this->close_socket();
240
            }
241 6
242 6
        } catch (\Exception $e) {
243 6
            // Something went wrong, set the connection status
244
            $this->setIsConnected(false);
245 24
            $this->closeChannels();
246
            throw $e; // Rethrow exception
247
        }
248
    }
249
250
    /**
251 36
     * Reconnects using the original connection settings.
252
     * This will not recreate any channels that were established previously
253
     */
254 36
    public function reconnect()
255
    {
256 36
        // Try to close the AMQP connection
257 36
        $this->safeClose();
258 36
        // Reconnect the socket/stream then AMQP
259 36
        $this->getIO()->reconnect();
260
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
261
        $this->connect();
262
    }
263
264
    /**
265
     * Cloning will use the old properties to make a new connection to the same server
266
     */
267
    public function __clone()
268
    {
269
        call_user_func_array(array($this, '__construct'), $this->construct_params);
270
    }
271
272
    public function __destruct()
273
    {
274
        if ($this->close_on_destruct) {
275
            $this->safeClose();
276
        }
277
    }
278
279 36
    /**
280
     * Attempts to close the connection safely
281
     */
282 36
    protected function safeClose()
283 27
    {
284 9
        try {
285 18
            if (isset($this->input) && $this->input) {
286
                $this->close();
287
            }
288 36
        } catch (\Exception $e) {
289
            // Nothing here
290
        }
291
    }
292
293
    /**
294
     * @param int $sec
295
     * @param int $usec
296
     * @return mixed
297
     */
298
    public function select($sec, $usec = 0)
299
    {
300
        return $this->getIO()->select($sec, $usec);
301
    }
302
303
    /**
304
     * Allows to not close the connection
305
     * it's useful after the fork when you don't want to close parent process connection
306
     *
307
     * @param bool $close
308
     */
309
    public function set_close_on_destruct($close = true)
310
    {
311 84
        $this->close_on_destruct = (bool) $close;
312
    }
313 84
314
    protected function close_input()
315 84
    {
316 84
        $this->debug->debug_msg('closing input');
317 84
318 42
        if (!is_null($this->input)) {
319 84
            $this->input->close();
320
            $this->input = null;
321 84
        }
322
    }
323 84
324
    protected function close_socket()
325 84
    {
326 84
        $this->debug->debug_msg('closing socket');
327 42
328 84
        if (!is_null($this->getIO())) {
329
            $this->getIO()->close();
330
        }
331
    }
332
333 84
    /**
334
     * @param $data
335 84
     */
336
    public function write($data)
337
    {
338 84
        $this->debug->debug_hexdump($data);
339 42
340
        try {
341
            $this->getIO()->write($data);
342
        } catch (AMQPRuntimeException $e) {
343 84
            $this->setIsConnected(false);
344
            throw $e;
345 84
        }
346
    }
347 84
348 84
    protected function do_close()
349 84
    {
350 84
        $this->setIsConnected(false);
351
        $this->close_input();
352
        $this->close_socket();
353
    }
354
355
    /**
356 84
     * @return int
357
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
358 84
     */
359 84
    public function get_free_channel_id()
360 84
    {
361
        for ($i = 1; $i <= $this->channel_max; $i++) {
362 9
            if (!isset($this->channels[$i])) {
363
                return $i;
364
            }
365
        }
366
367
        throw new AMQPRuntimeException('No free channel ids');
368
    }
369
370
    /**
371
     * @param string $channel
372
     * @param int $class_id
373
     * @param int $weight
374
     * @param int $body_size
375
     * @param string $packed_properties
376 72
     * @param string $body
377
     * @param AMQPWriter $pkt
378 72
     */
379 72
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
380 72
    {
381
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
382
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
383
    }
384
385
    /**
386
     * Returns a new AMQPWriter or mutates the provided $pkt
387
     *
388
     * @param string $channel
389
     * @param int $class_id
390
     * @param int $weight
391
     * @param int $body_size
392
     * @param string $packed_properties
393
     * @param string $body
394 72
     * @param AMQPWriter $pkt
395
     * @return AMQPWriter
396 72
     */
397
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
398
    {
399 72
        $pkt = $pkt ?: new AMQPWriter();
400 72
401 72
        // Content already prepared ?
402 72
        $key_cache = sprintf(
403 72
            '%s|%s|%s|%s',
404 36
            $channel,
405 36
            $packed_properties,
406
            $class_id,
407 72
            $weight
408 72
        );
409 72
410 72
        if (!isset($this->prepare_content_cache[$key_cache])) {
411 72
            $w = new AMQPWriter();
412 72
            $w->write_octet(2);
413 72
            $w->write_short($channel);
414 72
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
415 72
            $w->write_short($class_id);
416
            $w->write_short($weight);
417
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
418
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
419
                reset($this->prepare_content_cache);
420 36
                $old_key = key($this->prepare_content_cache);
421 72
                unset($this->prepare_content_cache[$old_key]);
422
            }
423 72
        }
424 72
        $pkt->write($this->prepare_content_cache[$key_cache]);
425
426 72
        $pkt->write_longlong($body_size);
427
        $pkt->write($packed_properties);
428
429
        $pkt->write_octet(0xCE);
430
431
432 72
        // memory efficiency: walk the string instead of biting
433 72
        // it. good for very large packets (close in size to
434 72
        // memory_limit setting)
435 66
        $position = 0;
436 66
        $bodyLength = mb_strlen($body,'ASCII');
437
        while ($position < $bodyLength) {
438 66
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
439 66
            $position += $this->frame_max - 8;
440 66
441
            $pkt->write_octet(3);
442 66
            $pkt->write_short($channel);
443
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
444 66
445 33
            $pkt->write($payload);
446
447 72
            $pkt->write_octet(0xCE);
448
        }
449
450
        return $pkt;
451
    }
452
453
    /**
454
     * @param string $channel
455
     * @param array $method_sig
456 84
     * @param AMQPWriter|string $args
457
     * @param null $pkt
458 84
     */
459 84
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
460 84
    {
461 84
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
462
        $this->write($pkt->getvalue());
463
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
464
    }
465
466
    /**
467
     * Returns a new AMQPWriter or mutates the provided $pkt
468
     *
469
     * @param string $channel
470
     * @param array $method_sig
471
     * @param AMQPWriter|string $args
472 84
     * @param AMQPWriter $pkt
473
     * @return AMQPWriter
474 84
     */
475 84
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
476 42
    {
477
        if ($args instanceof AMQPWriter) {
478 84
            $args = $args->getvalue();
479
        }
480 84
481 84
        $pkt = $pkt ?: new AMQPWriter();
482 84
483
        $pkt->write_octet(1);
484
        $pkt->write_short($channel);
485 84
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
486 84
        // in payload
487 84
488
        $pkt->write_short($method_sig[0]); // class_id
489 84
        $pkt->write_short($method_sig[1]); // method_id
490
        $pkt->write($args);
491 84
492
        $pkt->write_octet(0xCE);
493 84
494
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
495
496
        return $pkt;
497
    }
498
499
    /**
500
     * Waits for a frame from the server
501
     *
502
     * @param int $timeout
503
     * @return array
504
     * @throws \Exception
505 84
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
506
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
507 84
     */
508 42
    protected function wait_frame($timeout = 0)
509
    {
510
        if (is_null($this->input))
511
        {
512
            $this->setIsConnected(false);
513 84
            throw new AMQPRuntimeException('Broken pipe or closed connection');
514 84
        }
515
516
        $currentTimeout = $this->input->getTimeout();
517
        $this->input->setTimeout($timeout);
518 84
519 84
        try {
520 42
            // frame_type + channel_id + size
521
            $this->wait_frame_reader->reuse(
522 84
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
523 84
            );
524 84
525
            $frame_type = $this->wait_frame_reader->read_octet();
526
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
527 84
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
528 84
                throw new AMQPRuntimeException('Invalid frame type ' . $frame_type);
529
            }
530
            $channel = $this->wait_frame_reader->read_short();
531 84
            $size = $this->wait_frame_reader->read_long();
532
533 84
            // payload + ch
534 84
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
535
536 42
            $payload = $this->wait_frame_reader->read($size);
537
            $ch = $this->wait_frame_reader->read_octet();
538
539
        } catch (AMQPTimeoutException $e) {
540
            $this->input->setTimeout($currentTimeout);
541 84
            throw $e;
542
        }
543 84
544
        $this->input->setTimeout($currentTimeout);
545
546
        if ($ch != 0xCE) {
547
            throw new AMQPRuntimeException(sprintf(
548
                'Framing error, unexpected byte: %x',
549
                $ch
550 84
            ));
551
        }
552
553
        return array($frame_type, $channel, $payload);
554
    }
555
556
    /**
557
     * Waits for a frame from the server destined for a particular channel.
558
     *
559
     * @param string $channel_id
560 84
     * @param int $timeout
561
     * @return array
562
     */
563 84
    protected function wait_channel($channel_id, $timeout = 0)
564 84
    {
565 84
        // Keeping the original timeout unchanged.
566 84
        $_timeout = $timeout;
567
        while (true) {
568 84
            $now = time();
569
            try {
570
                list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
571
            }
572
            catch ( AMQPTimeoutException $e ) {
573
                if ( $this->heartbeat && microtime(true) - ($this->heartbeat*2) > $this->last_frame ) {
574
                    $this->debug->debug_msg("missed server heartbeat (at threshold * 2)");
575
                    $this->setIsConnected(false);
576
                    throw new AMQPRuntimeException("Missed server heartbeat");
577
                }
578
579
                throw $e;
580
            }
581 84
582 84
            $this->last_frame = microtime(true);
583
584
            if ($frame_channel === 0 && $frame_type === 8) {
585
                // skip heartbeat frames and reduce the timeout by the time passed
586
                $this->debug->debug_msg("received server heartbeat");
587
                if($_timeout > 0) {
588
                    $_timeout -= time() - $now;
589 6
                    if($_timeout <= 0) {
590 6
                        // If timeout has been reached, throw the exception without calling wait_frame
591 3
                        throw new AMQPTimeoutException("Timeout waiting on channel");
592
                    }
593
                }
594
                continue;
595
596 6
            } else {
597
598
                if ($frame_channel == $channel_id) {
599
                    return array($frame_type, $payload);
600 3
                }
601
602
                // Not the channel we were looking for.  Queue this frame
603
                //for later, when the other channel is looking for frames.
604
                // Make sure the channel still exists, it could have been
605
                // closed by a previous Exception.
606
                if (isset($this->channels[$frame_channel])) {
607
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
608
                }
609
610 84
                // If we just queued up a method for channel 0 (the Connection
611
                // itself) it's probably a close method in reaction to some
612 84
                // error, so deal with it right away.
613
                if (($frame_type == 1) && ($frame_channel == 0)) {
614
                    $this->wait();
615
                }
616 84
            }
617 84
        }
618 84
    }
619
620 84
    /**
621
     * Fetches a channel object identified by the numeric channel_id, or
622
     * create that object if it doesn't already exist.
623
     *
624
     * @param int $channel_id
625
     * @return AMQPChannel
626
     */
627
    public function channel($channel_id = null)
628
    {
629
        if (isset($this->channels[$channel_id])) {
630
            return $this->channels[$channel_id];
631 84
        }
632
633 84
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
634 42
        $ch = new AMQPChannel($this->connection, $channel_id);
635 36
        $this->channels[$channel_id] = $ch;
636 18
637
        return $ch;
638 84
    }
639 6
640
    /**
641
     * Requests a connection close
642 84
     *
643
     * @param int $reply_code
644 84
     * @param string $reply_text
645 84
     * @param array $method_sig
646 84
     * @return mixed|null
647 84
     */
648 84
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
649 42
    {
650 84
        if ($this->io instanceof StreamIO)
651
        {
652 84
            $this->io->disableHeartbeat();
653
        }
654 84
655 84
        if (empty($this->protocolWriter) || !$this->isConnected()) {
656 84
            return null;
657
        }
658
659
        $this->closeChannels();
660
661
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
662
            $reply_code,
663
            $reply_text,
664
            $method_sig[0],
665
            $method_sig[1]
666
        );
667
        $this->send_method_frame(array($class_id, $method_id), $args);
668
669
        $this->setIsConnected(false);
670
671
        return $this->wait(array(
672
            $this->waitHelper->get_wait('connection.close_ok')
673
        ),false,$this->connection_timeout);
674
    }
675
676
    /**
677
     * @param AMQPReader $reader
678
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
679
     */
680
    protected function connection_close(AMQPReader $reader)
681
    {
682
        $reply_code = $reader->read_short();
683
        $reply_text = $reader->read_shortstr();
684
        $class_id = $reader->read_short();
685
        $method_id = $reader->read_short();
686
687
        $this->x_close_ok();
688
689
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
690
    }
691 84
692
    /**
693 84
     * Confirms a connection close
694 84
     */
695
    protected function x_close_ok()
696
    {
697
        $this->send_method_frame(
698
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
699
        );
700
        $this->do_close();
701
    }
702 84
703
    /**
704 84
     * Confirm a connection close
705 84
     *
706 84
     * @param AMQPReader $args
707 84
     */
708 84
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
709
    {
710
        $this->do_close();
711 84
    }
712 42
713
    /**
714 84
     * @param string $virtual_host
715
     * @param string $capabilities
716
     * @param bool $insist
717
     * @return mixed
718 84
     */
719
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
720
    {
721
        $args = new AMQPWriter();
722
        $args->write_shortstr($virtual_host);
723
        $args->write_shortstr($capabilities);
724
        $args->write_bits(array($insist));
725
        $this->send_method_frame(array(10, 40), $args);
726 84
727
        $wait = array(
728 84
            $this->waitHelper->get_wait('connection.open_ok')
729 84
        );
730 84
731
        if ($this->protocolVersion == '0.8') {
732
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
733
        }
734
735
        return $this->wait($wait);
736
    }
737
738
    /**
739
     * Signals that the connection is ready
740
     *
741
     * @param AMQPReader $args
742
     */
743
    protected function connection_open_ok($args)
744
    {
745
        $this->known_hosts = $args->read_shortstr();
746
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
747
    }
748
749
    /**
750
     * Asks the client to use a different server
751
     *
752
     * @param AMQPReader $args
753
     * @return string
754
     */
755
    protected function connection_redirect($args)
756
    {
757
        $host = $args->read_shortstr();
758
        $this->known_hosts = $args->read_shortstr();
759
        $this->debug->debug_msg(sprintf(
760
                'Redirected to [%s], known_hosts [%s]',
761
                $host,
762
                $this->known_hosts
763
            ));
764
765
        return $host;
766
    }
767
768
    /**
769
     * Security mechanism challenge
770
     *
771
     * @param AMQPReader $args
772
     */
773
    protected function connection_secure($args)
774
    {
775
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
776
    }
777
778 84
    /**
779
     * Security mechanism response
780 84
     *
781 84
     * @param string $response
782 84
     */
783 84
    protected function x_secure_ok($response)
784 84
    {
785
        $args = new AMQPWriter();
786 84
        $args->write_longstr($response);
787 84
        $this->send_method_frame(array(10, 21), $args);
788 84
    }
789 84
790 84
    /**
791 84
     * Starts connection negotiation
792 42
     *
793 84
     * @param AMQPReader $args
794
     */
795
    protected function connection_start($args)
796
    {
797
        $this->version_major = $args->read_octet();
798
        $this->version_minor = $args->read_octet();
799
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
800
        $this->mechanisms = explode(' ', $args->read_longstr());
801 84
        $this->locales = explode(' ', $args->read_longstr());
802
803 84
        $this->debug->debug_connection_start(
804 84
            $this->version_major,
805 84
            $this->version_minor,
806 84
            $this->server_properties,
0 ignored issues
show
Bug introduced by
It seems like $this->server_properties can also be of type object<PhpAmqpLib\Wire\AMQPTable>; however, PhpAmqpLib\Helper\DebugH...ebug_connection_start() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
807 84
            $this->mechanisms,
808 84
            $this->locales
809 84
        );
810
    }
811
812
    /**
813
     * @param AMQPTable|array $clientProperties
814
     * @param string $mechanism
815
     * @param string $response
816 84
     * @param string $locale
817
     */
818 84
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
819 84
    {
820
        $args = new AMQPWriter();
821
        $args->write_table($clientProperties);
822
        $args->write_shortstr($mechanism);
823 84
        $args->write_longstr($response);
824 84
        $args->write_shortstr($locale);
825 84
        $this->send_method_frame(array(10, 11), $args);
826 42
    }
827
828
    /**
829 84
     * Proposes connection tuning parameters
830
     *
831
     * @param AMQPReader $args
832
     */
833 84
    protected function connection_tune($args)
834 84
    {
835
        $v = $args->read_short();
836
        if ($v) {
837
            $this->channel_max = $v;
838
        }
839
840
        $v = $args->read_long();
841
        if ($v) {
842
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
843 84
        }
844
845 84
        // use server proposed value if not set
846 84
        if ($this->heartbeat === null) {
847 84
            $this->heartbeat = $args->read_short();
848 84
        }
849 84
850 84
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
851 84
    }
852
853
    /**
854
     * Negotiates connection tuning parameters
855
     *
856
     * @param int $channel_max
857
     * @param int $frame_max
858
     * @param int $heartbeat
859
     */
860
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
861
    {
862
        $args = new AMQPWriter();
863
        $args->write_short($channel_max);
864 66
        $args->write_long($frame_max);
865
        $args->write_short($heartbeat);
866 66
        $this->send_method_frame(array(10, 31), $args);
867
        $this->wait_tune_ok = false;
868
    }
869
870
    /**
871
     * @return SocketIO
872
     */
873
    public function getSocket()
874
    {
875
        return $this->io->getSocket();
876
    }
877
878
    /**
879
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
880
     */
881
    protected function getIO()
882
    {
883
        return $this->io;
884
    }
885
886
    /**
887
     * Handles connection blocked notifications
888
     *
889
     * @param AMQPReader $args
890
     */
891
    protected function connection_blocked(AMQPReader $args)
892
    {
893
        // Call the block handler and pass in the reason
894
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
895
    }
896
897
    /**
898
     * Handles connection unblocked notifications
899
     *
900
     * @param AMQPReader $args
901
     */
902
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
903
    {
904
        // No args to an unblock event
905
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
906
    }
907
908
    /**
909
     * Sets a handler which is called whenever a connection.block is sent from the server
910
     *
911
     * @param callable $callback
912
     */
913
    public function set_connection_block_handler($callback)
914
    {
915
        $this->connection_block_handler = $callback;
916 90
    }
917
918 90
    /**
919
     * Sets a handler which is called whenever a connection.block is sent from the server
920
     *
921
     * @param callable $callback
922
     */
923
    public function set_connection_unblock_handler($callback)
924
    {
925
        $this->connection_unblock_handler = $callback;
926 90
    }
927
928 90
    /**
929 90
     * Gets the connection status
930
     *
931
     * @return bool
932
     */
933
    public function isConnected()
934 90
    {
935
        return (bool) $this->is_connected;
936 90
    }
937
938 84
    /**
939 84
     * Set the connection status
940
     *
941
     * @param bool $is_connected
942 36
     */
943 36
    protected function setIsConnected($is_connected)
944
    {
945
        $this->is_connected = (bool) $is_connected;
946 45
    }
947 90
948
    /**
949
     * Closes all available channels
950
     */
951
    protected function closeChannels()
952
    {
953
        foreach ($this->channels as $key => $channel) {
954 66
            // channels[0] is this connection object, so don't close it yet
955
            if ($key === 0) {
956 66
                continue;
957
            }
958
            try {
959
                $channel->close();
960
            } catch (\Exception $e) {
961
                /* Ignore closing errors */
962
            }
963
        }
964
    }
965
966
    /**
967
     * Should the connection be attempted during construction?
968
     *
969
     * @return bool
970
     */
971
    public function connectOnConstruct()
972
    {
973
        return true;
974
    }
975
976
    /**
977
     * @return array
978
     */
979
    public function getServerProperties()
980
    {
981
        return $this->server_properties;
982
    }
983
}
984