Completed
Push — master ( 9c0fac...fa2f0d )
by John
07:11
created

AbstractConnection   D

Complexity

Total Complexity 91

Size/Duplication

Total Lines 940
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 10

Test Coverage

Coverage 74.6%

Importance

Changes 26
Bugs 5 Features 2
Metric Value
wmc 91
c 26
b 5
f 2
lcom 2
cbo 10
dl 0
loc 940
ccs 279
cts 374
cp 0.746
rs 4.4444

44 Methods

Rating   Name   Duplication   Size   Complexity  
A __destruct() 0 6 2
A set_connection_block_handler() 0 4 1
A __clone() 0 4 1
A connection_close() 0 11 1
A reconnect() 0 9 1
A connection_close_ok() 0 4 1
A x_start_ok() 0 9 1
A connection_blocked() 0 5 1
A isConnected() 0 4 1
A connectOnConstruct() 0 4 1
A getServerProperties() 0 4 1
B __construct() 0 46 4
A safeClose() 0 10 4
A select() 0 4 1
A set_close_on_destruct() 0 4 1
A close_input() 0 9 2
A close_socket() 0 8 2
A write() 0 11 2
A do_close() 0 6 1
A get_free_channel_id() 0 10 3
A send_content() 0 5 1
B prepare_content() 0 55 5
A send_channel_method_frame() 0 6 1
A prepare_channel_method_frame() 0 23 3
A connection_unblocked() 0 5 1
A setIsConnected() 0 4 1
B connect() 0 48 5
B wait_frame() 0 47 5
D wait_channel() 0 42 10
A channel() 0 12 3
B close() 0 27 4
A x_close_ok() 0 7 1
A x_open() 0 18 2
A connection_open_ok() 0 5 1
A connection_redirect() 0 12 1
A connection_secure() 0 4 1
A x_secure_ok() 0 6 1
A connection_start() 0 16 1
A connection_tune() 0 19 4
A x_tune_ok() 0 9 1
A getSocket() 0 4 1
A getIO() 0 4 1
A set_connection_unblock_handler() 0 4 1
A closeChannels() 0 14 4

How to fix   Complexity   

Complex Class

Complex classes like AbstractConnection often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractConnection, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AMQPChannel;
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use PhpAmqpLib\Wire\AMQPWriter;
12
use PhpAmqpLib\Wire\IO\AbstractIO;
13
use PhpAmqpLib\Wire\IO\SocketIO;
14
use PhpAmqpLib\Wire\IO\StreamIO;
15
16
class AbstractConnection extends AbstractChannel
17
{
18
    /** @var array */
19
    public static $LIBRARY_PROPERTIES = array(
20
        'product' => array('S', 'AMQPLib'),
21
        'platform' => array('S', 'PHP'),
22
        'version' => array('S', '2.6'),
23
        'information' => array('S', ''),
24
        'copyright' => array('S', ''),
25
        'capabilities' => array(
26
            'F',
27
            array(
28
                'authentication_failure_close' => array('t', true),
29
                'publisher_confirms' => array('t', true),
30
                'consumer_cancel_notify' => array('t', true),
31
                'exchange_exchange_bindings' => array('t', true),
32
                'basic.nack' => array('t', true),
33
                'connection.blocked' => array('t', true)
34
            )
35
        )
36
    );
37
38
    /** @var AMQPChannel[] */
39
    public $channels = array();
40
41
    /** @var int */
42
    protected $version_major;
43
44
    /** @var int */
45
    protected $version_minor;
46
47
    /** @var array */
48
    protected $server_properties;
49
50
    /** @var array */
51
    protected $mechanisms;
52
53
    /** @var array */
54
    protected $locales;
55
56
    /** @var bool */
57
    protected $wait_tune_ok;
58
59
    /** @var string */
60
    protected $known_hosts;
61
62
    /** @var AMQPReader */
63
    protected $input;
64
65
    /** @var string */
66
    protected $vhost;
67
68
    /** @var bool */
69
    protected $insist;
70
71
    /** @var string */
72
    protected $login_method;
73
74
    /** @var string */
75
    protected $login_response;
76
77
    /** @var string */
78
    protected $locale;
79
80
    /** @var int */
81
    protected $heartbeat;
82
83
    /** @var SocketIO */
84
    protected $sock;
85
86
    /** @var int */
87
    protected $channel_max = 65535;
88
89
    /** @var int */
90
    protected $frame_max = 131072;
91
92
     /** @var array Constructor parameters for clone */
93
    protected $construct_params;
94
95
    /** @var bool Close the connection in destructor */
96
    protected $close_on_destruct = true;
97
98
    /** @var bool Maintain connection status */
99
    protected $is_connected = false;
100
101
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
102
    protected $io;
103
104
    /** @var \PhpAmqpLib\Wire\AMQPReader */
105
    protected $wait_frame_reader;
106
107
    /** @var callable Handles connection blocking from the server */
108
    private $connection_block_handler;
109
110
    /** @var callable Handles connection unblocking from the server */
111
    private $connection_unblock_handler;
112
113
    /**
114
     * Circular buffer to speed up prepare_content().
115
     * Max size limited by $prepare_content_cache_max_size.
116
     *
117
     * @var array
118
     * @see prepare_content()
119
     */
120
    private $prepare_content_cache;
121
122
    /** @var int Maximal size of $prepare_content_cache */
123
    private $prepare_content_cache_max_size;
124
125
    /**
126
     * @param string $user
127
     * @param string $password
128
     * @param string $vhost
129
     * @param bool $insist
130
     * @param string $login_method
131
     * @param null $login_response
132
     * @param string $locale
133
     * @param AbstractIO $io
134
     * @param int $heartbeat
135
     * @throws \Exception
136
     */
137 84
    public function __construct(
138
        $user,
139
        $password,
140
        $vhost = '/',
141
        $insist = false,
142
        $login_method = 'AMQPLAIN',
143
        $login_response = null,
144
        $locale = 'en_US',
145
        AbstractIO $io,
146
        $heartbeat = 0
147
    ) {
148
        // save the params for the use of __clone
149 84
        $this->construct_params = func_get_args();
150
151 84
        $this->wait_frame_reader = new AMQPReader(null);
152 84
        $this->vhost = $vhost;
153 84
        $this->insist = $insist;
154 84
        $this->login_method = $login_method;
155 84
        $this->login_response = $login_response;
156 84
        $this->locale = $locale;
157 84
        $this->io = $io;
158 84
        $this->heartbeat = $heartbeat;
159
160 84
        if ($user && $password) {
161 84
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
162 84
            $this->login_response->write_table(array(
163 84
                'LOGIN' => array('S', $user),
164 84
                'PASSWORD' => array('S', $password)
165 70
            ));
166
167
            // Skip the length
168 84
            $responseValue = $this->login_response->getvalue();
169 84
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
170
171 70
        } else {
172
            $this->login_response = null;
173
        }
174
175 84
        $this->prepare_content_cache = array();
176 84
        $this->prepare_content_cache_max_size = 100;
177
178
        // Lazy Connection waits on connecting
179 84
        if ($this->connectOnConstruct()) {
180 60
            $this->connect();
181 45
        }
182 78
    }
183
184
    /**
185
     * Connects to the AMQP server
186
     */
187 84
    protected function connect()
188
    {
189
        try {
190
            // Loop until we connect
191 84
            while (!$this->isConnected()) {
192
                // Assume we will connect, until we dont
193 84
                $this->setIsConnected(true);
194
195
                // Connect the socket
196 84
                $this->getIO()->connect();
197
198 78
                $this->channels = array();
199
                // The connection object itself is treated as channel 0
200 78
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
201
202 78
                $this->input = new AMQPReader(null, $this->getIO());
203
204 78
                $this->write($this->amqp_protocol_header);
205 78
                $this->wait(array($this->waitHelper->get_wait('connection.start')));
206 78
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
207
208 78
                $this->wait_tune_ok = true;
209 78
                while ($this->wait_tune_ok) {
210 78
                    $this->wait(array(
211 78
                        $this->waitHelper->get_wait('connection.secure'),
212 78
                        $this->waitHelper->get_wait('connection.tune')
213 65
                    ));
214 65
                }
215
216 78
                $host = $this->x_open($this->vhost, '', $this->insist);
217 78
                if (!$host) {
218 78
                    return null; // we weren't redirected
219
                }
220
221
                $this->setIsConnected(false);
222
                $this->closeChannels();
223
224
                // we were redirected, close the socket, loop and try again
225
                $this->close_socket();
226
            }
227
228 26
        } catch (\Exception $e) {
229
            // Something went wrong, set the connection status
230 6
            $this->setIsConnected(false);
231 6
            $this->closeChannels();
232 6
            throw $e; // Rethrow exception
233
        }
234 24
    }
235
236
    /**
237
     * Reconnects using the original connection settings.
238
     * This will not recreate any channels that were established previously
239
     */
240 36
    public function reconnect()
241
    {
242
        // Try to close the AMQP connection
243 36
        $this->safeClose();
244
        // Reconnect the socket/stream then AMQP
245 36
        $this->getIO()->reconnect();
246 36
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
247 36
        $this->connect();
248 36
    }
249
250
    /**
251
     * Cloning will use the old properties to make a new connection to the same server
252
     */
253
    public function __clone()
254
    {
255
        call_user_func_array(array($this, '__construct'), $this->construct_params);
256
    }
257
258
    public function __destruct()
259
    {
260
        if ($this->close_on_destruct) {
261
            $this->safeClose();
262
        }
263
    }
264
265
    /**
266
     * Attempts to close the connection safely
267
     */
268 36
    protected function safeClose()
269
    {
270
        try {
271 36
            if (isset($this->input) && $this->input) {
272 21
                $this->close();
273 15
            }
274 30
        } catch (\Exception $e) {
275
            // Nothing here
276
        }
277 36
    }
278
279
    /**
280
     * @param int $sec
281
     * @param int $usec
282
     * @return mixed
283
     */
284
    public function select($sec, $usec = 0)
285
    {
286
        return $this->getIO()->select($sec, $usec);
287
    }
288
289
    /**
290
     * Allows to not close the connection
291
     * it's useful after the fork when you don't want to close parent process connection
292
     *
293
     * @param bool $close
294
     */
295
    public function set_close_on_destruct($close = true)
296
    {
297
        $this->close_on_destruct = (bool) $close;
298
    }
299
300 78
    protected function close_input()
301
    {
302 78
        $this->debug->debug_msg('closing input');
303
304 78
        if (!is_null($this->input)) {
305 78
            $this->input->close();
306 78
            $this->input = null;
307 65
        }
308 78
    }
309
310 78
    protected function close_socket()
311
    {
312 78
        $this->debug->debug_msg('closing socket');
313
314 78
        if (!is_null($this->getIO())) {
315 78
            $this->getIO()->close();
316 65
        }
317 78
    }
318
319
    /**
320
     * @param $data
321
     */
322 78
    public function write($data)
323
    {
324 78
        $this->debug->debug_hexdump($data);
325
326
        try {
327 78
            $this->getIO()->write($data);
328 65
        } catch (AMQPRuntimeException $e) {
329
            $this->setIsConnected(false);
330
            throw $e;
331
        }
332 78
    }
333
334 78
    protected function do_close()
335
    {
336 78
        $this->setIsConnected(false);
337 78
        $this->close_input();
338 78
        $this->close_socket();
339 78
    }
340
341
    /**
342
     * @return int
343
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
344
     */
345 78
    public function get_free_channel_id()
346
    {
347 78
        for ($i = 1; $i <= $this->channel_max; $i++) {
348 78
            if (!isset($this->channels[$i])) {
349 78
                return $i;
350
            }
351 15
        }
352
353
        throw new AMQPRuntimeException('No free channel ids');
354
    }
355
356
    /**
357
     * @param string $channel
358
     * @param int $class_id
359
     * @param int $weight
360
     * @param int $body_size
361
     * @param string $packed_properties
362
     * @param string $body
363
     * @param AMQPWriter $pkt
364
     */
365 66
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
366
    {
367 66
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
368 66
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
369 66
    }
370
371
    /**
372
     * Returns a new AMQPWriter or mutates the provided $pkt
373
     *
374
     * @param string $channel
375
     * @param int $class_id
376
     * @param int $weight
377
     * @param int $body_size
378
     * @param string $packed_properties
379
     * @param string $body
380
     * @param AMQPWriter $pkt
381
     * @return AMQPWriter
382
     */
383 66
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
384
    {
385 66
        $pkt = $pkt ?: new AMQPWriter();
386
387
        // Content already prepared ?
388 66
        $key_cache = sprintf(
389 66
            '%s|%s|%s|%s',
390 55
            $channel,
391 55
            $packed_properties,
392 55
            $class_id,
393
            $weight
394 55
        );
395
396 66
        if (!isset($this->prepare_content_cache[$key_cache])) {
397 66
            $w = new AMQPWriter();
398 66
            $w->write_octet(2);
399 66
            $w->write_short($channel);
400 66
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
401 66
            $w->write_short($class_id);
402 66
            $w->write_short($weight);
403 66
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
404 66
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
405
                reset($this->prepare_content_cache);
406
                $old_key = key($this->prepare_content_cache);
407
                unset($this->prepare_content_cache[$old_key]);
408
            }
409 55
        }
410 66
        $pkt->write($this->prepare_content_cache[$key_cache]);
411
412 66
        $pkt->write_longlong($body_size);
413 66
        $pkt->write($packed_properties);
414
415 66
        $pkt->write_octet(0xCE);
416
417
418
        // memory efficiency: walk the string instead of biting
419
        // it. good for very large packets (close in size to
420
        // memory_limit setting)
421 66
        $position = 0;
422 66
        $bodyLength = mb_strlen($body,'ASCII');
423 66
        while ($position < $bodyLength) {
424 60
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
425 60
            $position += $this->frame_max - 8;
426
427 60
            $pkt->write_octet(3);
428 60
            $pkt->write_short($channel);
429 60
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
430
431 60
            $pkt->write($payload);
432
433 60
            $pkt->write_octet(0xCE);
434 50
        }
435
436 66
        return $pkt;
437
    }
438
439
    /**
440
     * @param string $channel
441
     * @param array $method_sig
442
     * @param AMQPWriter|string $args
443
     * @param null $pkt
444
     */
445 78
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
446
    {
447 78
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
448 78
        $this->write($pkt->getvalue());
449 78
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
450 78
    }
451
452
    /**
453
     * Returns a new AMQPWriter or mutates the provided $pkt
454
     *
455
     * @param string $channel
456
     * @param array $method_sig
457
     * @param AMQPWriter|string $args
458
     * @param AMQPWriter $pkt
459
     * @return AMQPWriter
460
     */
461 78
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
462
    {
463 78
        if ($args instanceof AMQPWriter) {
464 78
            $args = $args->getvalue();
465 65
        }
466
467 78
        $pkt = $pkt ?: new AMQPWriter();
468
469 78
        $pkt->write_octet(1);
470 78
        $pkt->write_short($channel);
471 78
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
472
        // in payload
473
474 78
        $pkt->write_short($method_sig[0]); // class_id
475 78
        $pkt->write_short($method_sig[1]); // method_id
476 78
        $pkt->write($args);
477
478 78
        $pkt->write_octet(0xCE);
479
480 78
        $this->debug->debug_method_signature1($method_sig);
0 ignored issues
show
Documentation introduced by
$method_sig is of type array, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
481
482 78
        return $pkt;
483
    }
484
485
    /**
486
     * Waits for a frame from the server
487
     *
488
     * @param int $timeout
489
     * @return array
490
     * @throws \Exception
491
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
492
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
493
     */
494 78
    protected function wait_frame($timeout = 0)
495
    {
496 78
        if (is_null($this->input))
497 65
        {
498
            $this->setIsConnected(false);
499
            throw new AMQPRuntimeException('Broken pipe or closed connection');
500
        }
501
502 78
        $currentTimeout = $this->input->getTimeout();
503 78
        $this->input->setTimeout($timeout);
504
505
        try {
506
            // frame_type + channel_id + size
507 78
            $this->wait_frame_reader->reuse(
508 78
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
509 65
            );
510
511 78
            $frame_type = $this->wait_frame_reader->read_octet();
512 78
            $class = self::$PROTOCOL_CONSTANTS_CLASS;
513 78
            if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) {
514
                throw new AMQPRuntimeException('Invalid frame type ' . $frame_type);
515
            }
516 78
            $channel = $this->wait_frame_reader->read_short();
517 78
            $size = $this->wait_frame_reader->read_long();
518
519
            // payload + ch
520 78
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
521
522 78
            $payload = $this->wait_frame_reader->read($size);
523 78
            $ch = $this->wait_frame_reader->read_octet();
524
525 65
        } catch (AMQPTimeoutException $e) {
526
            $this->input->setTimeout($currentTimeout);
527
            throw $e;
528
        }
529
530 78
        $this->input->setTimeout($currentTimeout);
531
532 78
        if ($ch != 0xCE) {
533
            throw new AMQPRuntimeException(sprintf(
534
                'Framing error, unexpected byte: %x',
535
                $ch
536
            ));
537
        }
538
539 78
        return array($frame_type, $channel, $payload);
540
    }
541
542
    /**
543
     * Waits for a frame from the server destined for a particular channel.
544
     *
545
     * @param string $channel_id
546
     * @param int $timeout
547
     * @return array
548
     */
549 78
    protected function wait_channel($channel_id, $timeout = 0)
550
    {
551
        // Keeping the original timeout unchanged.
552 78
        $_timeout = $timeout;
553 78
        while (true) {
554 78
            $now = time();
555 78
            list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
556
557 78
            if ($frame_channel === 0 && $frame_type === 8) {
558
                // skip heartbeat frames and reduce the timeout by the time passed
559
                if($_timeout > 0) {
560
                    $_timeout -= time() - $now;
561
                    if($_timeout <= 0) {
562
                        // If timeout has been reached, throw the exception without calling wait_frame
563
                        throw new AMQPTimeoutException("Timeout waiting on channel");
564
                    }
565
                }
566
                continue;
567
568
            } else {
569
570 78
                if ($frame_channel == $channel_id) {
571 78
                    return array($frame_type, $payload);
572
                }
573
574
                // Not the channel we were looking for.  Queue this frame
575
                //for later, when the other channel is looking for frames.
576
                // Make sure the channel still exists, it could have been
577
                // closed by a previous Exception.
578 6
                if (isset($this->channels[$frame_channel])) {
579 6
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
580 5
                }
581
582
                // If we just queued up a method for channel 0 (the Connection
583
                // itself) it's probably a close method in reaction to some
584
                // error, so deal with it right away.
585 6
                if (($frame_type == 1) && ($frame_channel == 0)) {
586
                    $this->wait();
587
                }
588
            }
589 5
        }
590
    }
591
592
    /**
593
     * Fetches a channel object identified by the numeric channel_id, or
594
     * create that object if it doesn't already exist.
595
     *
596
     * @param string $channel_id
597
     * @return AMQPChannel
598
     */
599 78
    public function channel($channel_id = null)
600
    {
601 78
        if (isset($this->channels[$channel_id])) {
602
            return $this->channels[$channel_id];
603
        }
604
605 78
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
606 78
        $ch = new AMQPChannel($this->connection, $channel_id);
607 78
        $this->channels[$channel_id] = $ch;
608
609 78
        return $ch;
610
    }
611
612
    /**
613
     * Requests a connection close
614
     *
615
     * @param int $reply_code
616
     * @param string $reply_text
617
     * @param array $method_sig
618
     * @return mixed|null
619
     */
620 78
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
621
    {
622 78
        if ($this->io instanceof StreamIO)
623 65
        {
624 36
            $this->io->disableHeartbeat();
625 30
        }
626
627 78
        if (!$this->protocolWriter || !$this->isConnected()) {
628 6
            return null;
629
        }
630
631 78
        $this->closeChannels();
632
633 78
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
634 65
            $reply_code,
635 65
            $reply_text,
636 78
            $method_sig[0],
637 78
            $method_sig[1]
638 65
        );
639 78
        $this->send_method_frame(array($class_id, $method_id), $args);
640
641 78
        $this->setIsConnected(false);
642
643 78
        return $this->wait(array(
644 78
            $this->waitHelper->get_wait('connection.close_ok')
645 65
        ));
646
    }
647
648
    /**
649
     * @param AMQPReader $reader
650
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
651
     */
652
    protected function connection_close(AMQPReader $reader)
653
    {
654
        $reply_code = $reader->read_short();
655
        $reply_text = $reader->read_shortstr();
656
        $class_id = $reader->read_short();
657
        $method_id = $reader->read_short();
658
659
        $this->x_close_ok();
660
661
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
662
    }
663
664
    /**
665
     * Confirms a connection close
666
     */
667
    protected function x_close_ok()
668
    {
669
        $this->send_method_frame(
670
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
671
        );
672
        $this->do_close();
673
    }
674
675
    /**
676
     * Confirm a connection close
677
     *
678
     * @param AMQPReader $args
679
     */
680 78
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
681
    {
682 78
        $this->do_close();
683 78
    }
684
685
    /**
686
     * @param string $virtual_host
687
     * @param string $capabilities
688
     * @param bool $insist
689
     * @return mixed
690
     */
691 78
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
692
    {
693 78
        $args = new AMQPWriter();
694 78
        $args->write_shortstr($virtual_host);
695 78
        $args->write_shortstr($capabilities);
696 78
        $args->write_bits(array($insist));
697 78
        $this->send_method_frame(array(10, 40), $args);
698
699
        $wait = array(
700 78
            $this->waitHelper->get_wait('connection.open_ok')
701 65
        );
702
703 78
        if ($this->protocolVersion == '0.8') {
704
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
705
        }
706
707 78
        return $this->wait($wait);
708
    }
709
710
    /**
711
     * Signals that the connection is ready
712
     *
713
     * @param AMQPReader $args
714
     */
715 78
    protected function connection_open_ok($args)
716
    {
717 78
        $this->known_hosts = $args->read_shortstr();
718 78
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
719 78
    }
720
721
    /**
722
     * Asks the client to use a different server
723
     *
724
     * @param AMQPReader $args
725
     * @return string
726
     */
727
    protected function connection_redirect($args)
728
    {
729
        $host = $args->read_shortstr();
730
        $this->known_hosts = $args->read_shortstr();
731
        $this->debug->debug_msg(sprintf(
732
                'Redirected to [%s], known_hosts [%s]',
733
                $host,
734
                $this->known_hosts
735
            ));
736
737
        return $host;
738
    }
739
740
    /**
741
     * Security mechanism challenge
742
     *
743
     * @param AMQPReader $args
744
     */
745
    protected function connection_secure($args)
746
    {
747
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
748
    }
749
750
    /**
751
     * Security mechanism response
752
     *
753
     * @param string $response
754
     */
755
    protected function x_secure_ok($response)
756
    {
757
        $args = new AMQPWriter();
758
        $args->write_longstr($response);
759
        $this->send_method_frame(array(10, 21), $args);
760
    }
761
762
    /**
763
     * Starts connection negotiation
764
     *
765
     * @param AMQPReader $args
766
     */
767 78
    protected function connection_start($args)
768
    {
769 78
        $this->version_major = $args->read_octet();
770 78
        $this->version_minor = $args->read_octet();
771 78
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
772 78
        $this->mechanisms = explode(' ', $args->read_longstr());
773 78
        $this->locales = explode(' ', $args->read_longstr());
774
775 78
        $this->debug->debug_connection_start(
776 78
            $this->version_major,
777 78
            $this->version_minor,
778 78
            $this->server_properties,
0 ignored issues
show
Bug introduced by
It seems like $this->server_properties can also be of type object<PhpAmqpLib\Wire\AMQPTable>; however, PhpAmqpLib\Helper\DebugH...ebug_connection_start() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
779 78
            $this->mechanisms,
780 78
            $this->locales
781 65
        );
782 78
    }
783
784
    /**
785
     * @param AMQPTable|array $clientProperties
786
     * @param string $mechanism
787
     * @param string $response
788
     * @param string $locale
789
     */
790 78
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
791
    {
792 78
        $args = new AMQPWriter();
793 78
        $args->write_table($clientProperties);
794 78
        $args->write_shortstr($mechanism);
795 78
        $args->write_longstr($response);
796 78
        $args->write_shortstr($locale);
797 78
        $this->send_method_frame(array(10, 11), $args);
798 78
    }
799
800
    /**
801
     * Proposes connection tuning parameters
802
     *
803
     * @param AMQPReader $args
804
     */
805 78
    protected function connection_tune($args)
806
    {
807 78
        $v = $args->read_short();
808 78
        if ($v) {
809
            $this->channel_max = $v;
810
        }
811
812 78
        $v = $args->read_long();
813 78
        if ($v) {
814 78
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
815 65
        }
816
817
        // use server proposed value if not set
818 78
        if ($this->heartbeat === null) {
819
            $this->heartbeat = $args->read_short();
820
        }
821
822 78
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
823 78
    }
824
825
    /**
826
     * Negotiates connection tuning parameters
827
     *
828
     * @param int $channel_max
829
     * @param int $frame_max
830
     * @param int $heartbeat
831
     */
832 78
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
833
    {
834 78
        $args = new AMQPWriter();
835 78
        $args->write_short($channel_max);
836 78
        $args->write_long($frame_max);
837 78
        $args->write_short($heartbeat);
838 78
        $this->send_method_frame(array(10, 31), $args);
839 78
        $this->wait_tune_ok = false;
840 78
    }
841
842
    /**
843
     * @return SocketIO
844
     */
845
    public function getSocket()
846
    {
847
        return $this->io->getSocket();
848
    }
849
850
    /**
851
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
852
     */
853 60
    protected function getIO()
854
    {
855 60
        return $this->io;
856
    }
857
858
    /**
859
     * Handles connection blocked notifications
860
     *
861
     * @param AMQPReader $args
862
     */
863
    protected function connection_blocked(AMQPReader $args)
864
    {
865
        // Call the block handler and pass in the reason
866
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
867
    }
868
869
    /**
870
     * Handles connection unblocked notifications
871
     *
872
     * @param AMQPReader $args
873
     */
874
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
875
    {
876
        // No args to an unblock event
877
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
878
    }
879
880
    /**
881
     * Sets a handler which is called whenever a connection.block is sent from the server
882
     *
883
     * @param callable $callback
884
     */
885
    public function set_connection_block_handler($callback)
886
    {
887
        $this->connection_block_handler = $callback;
888
    }
889
890
    /**
891
     * Sets a handler which is called whenever a connection.block is sent from the server
892
     *
893
     * @param callable $callback
894
     */
895
    public function set_connection_unblock_handler($callback)
896
    {
897
        $this->connection_unblock_handler = $callback;
898
    }
899
900
    /**
901
     * Gets the connection status
902
     *
903
     * @return bool
904
     */
905 84
    public function isConnected()
906
    {
907 84
        return (bool) $this->is_connected;
908
    }
909
910
    /**
911
     * Set the connection status
912
     *
913
     * @param bool $is_connected
914
     */
915 84
    protected function setIsConnected($is_connected)
916
    {
917 84
        $this->is_connected = (bool) $is_connected;
918 84
    }
919
920
    /**
921
     * Closes all available channels
922
     */
923 84
    protected function closeChannels()
924
    {
925 84
        foreach ($this->channels as $key => $channel) {
926
            // channels[0] is this connection object, so don't close it yet
927 78
            if ($key === 0) {
928 78
                continue;
929
            }
930
            try {
931 36
                $channel->close();
932 36
            } catch (\Exception $e) {
933
                /* Ignore closing errors */
934
            }
935 70
        }
936 84
    }
937
938
    /**
939
     * Should the connection be attempted during construction?
940
     *
941
     * @return bool
942
     */
943 60
    public function connectOnConstruct()
944
    {
945 60
        return true;
946
    }
947
948
    /**
949
     * @return array
950
     */
951
    public function getServerProperties()
952
    {
953
        return $this->server_properties;
954
    }
955
}
956