Completed
Push — master ( 9b90e7...a2b649 )
by John
17:01 queued 14:42
created

AbstractConnection   D

Complexity

Total Complexity 89

Size/Duplication

Total Lines 922
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 10

Test Coverage

Coverage 73.98%

Importance

Changes 31
Bugs 7 Features 2
Metric Value
wmc 89
c 31
b 7
f 2
lcom 2
cbo 10
dl 0
loc 922
ccs 273
cts 369
cp 0.7398
rs 4.4444

43 Methods

Rating   Name   Duplication   Size   Complexity  
B close() 0 27 4
A x_close_ok() 0 7 1
A connection_close_ok() 0 4 1
A x_open() 0 18 2
A connection_open_ok() 0 5 1
A connection_redirect() 0 12 1
A connection_secure() 0 4 1
A x_secure_ok() 0 6 1
A connection_start() 0 16 1
A connection_tune() 0 19 4
A x_tune_ok() 0 9 1
A getSocket() 0 4 1
A getIO() 0 4 1
A connection_blocked() 0 5 1
A connection_unblocked() 0 5 1
A set_connection_block_handler() 0 4 1
A set_connection_unblock_handler() 0 4 1
A isConnected() 0 4 1
A setIsConnected() 0 4 1
A closeChannels() 0 14 4
A connectOnConstruct() 0 4 1
B __construct() 0 46 4
B connect() 0 48 5
A reconnect() 0 9 1
A __clone() 0 4 1
A __destruct() 0 6 2
A safeClose() 0 10 4
A select() 0 4 1
A set_close_on_destruct() 0 4 1
A close_input() 0 9 2
A close_socket() 0 8 2
A write() 0 11 2
A do_close() 0 6 1
A get_free_channel_id() 0 10 3
A send_content() 0 5 1
B prepare_content() 0 55 5
A send_channel_method_frame() 0 6 1
A prepare_channel_method_frame() 0 23 3
B wait_frame() 0 43 4
D wait_channel() 0 42 10
A channel() 0 12 3
A connection_close() 0 11 1
A x_start_ok() 0 9 1

How to fix   Complexity   

Complex Class

Complex classes like AbstractConnection often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractConnection, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace PhpAmqpLib\Connection;
3
4
use PhpAmqpLib\Channel\AMQPChannel;
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPTable;
11
use PhpAmqpLib\Wire\AMQPWriter;
12
use PhpAmqpLib\Wire\IO\AbstractIO;
13
use PhpAmqpLib\Wire\IO\SocketIO;
14
use PhpAmqpLib\Wire\IO\StreamIO;
15
16
class AbstractConnection extends AbstractChannel
17
{
18
    /** @var array */
19
    public static $LIBRARY_PROPERTIES = array(
20
        'product' => array('S', 'AMQPLib'),
21
        'platform' => array('S', 'PHP'),
22
        'version' => array('S', '2.4'),
23
        'information' => array('S', ''),
24
        'copyright' => array('S', ''),
25
        'capabilities' => array(
26
            'F',
27
            array(
28
                'authentication_failure_close' => array('t', true),
29
                'publisher_confirms' => array('t', true),
30
                'consumer_cancel_notify' => array('t', true),
31
                'exchange_exchange_bindings' => array('t', true),
32
                'basic.nack' => array('t', true),
33
                'connection.blocked' => array('t', true)
34
            )
35
        )
36
    );
37
38
    /** @var AMQPChannel[] */
39
    public $channels = array();
40
41
    /** @var int */
42
    protected $version_major;
43
44
    /** @var int */
45
    protected $version_minor;
46
47
    /** @var array */
48
    protected $server_properties;
49
50
    /** @var array */
51
    protected $mechanisms;
52
53
    /** @var array */
54
    protected $locales;
55
56
    /** @var bool */
57
    protected $wait_tune_ok;
58
59
    /** @var string */
60
    protected $known_hosts;
61
62
    /** @var AMQPReader */
63
    protected $input;
64
65
    /** @var string */
66
    protected $vhost;
67
68
    /** @var bool */
69
    protected $insist;
70
71
    /** @var string */
72
    protected $login_method;
73
74
    /** @var string */
75
    protected $login_response;
76
77
    /** @var string */
78
    protected $locale;
79
80
    /** @var int */
81
    protected $heartbeat;
82
83
    /** @var SocketIO */
84
    protected $sock;
85
86
    /** @var int */
87
    protected $channel_max = 65535;
88
89
    /** @var int */
90
    protected $frame_max = 131072;
91
92
     /** @var array Constructor parameters for clone */
93
    protected $construct_params;
94
95
    /** @var bool Close the connection in destructor */
96
    protected $close_on_destruct = true;
97
98
    /** @var bool Maintain connection status */
99
    protected $is_connected = false;
100
101
    /** @var \PhpAmqpLib\Wire\IO\AbstractIO */
102
    protected $io;
103
104
    /** @var \PhpAmqpLib\Wire\AMQPReader */
105
    protected $wait_frame_reader;
106
107
    /** @var callable Handles connection blocking from the server */
108
    private $connection_block_handler;
109
110
    /** @var callable Handles connection unblocking from the server */
111
    private $connection_unblock_handler;
112
113
    /**
114
     * Circular buffer to speed up prepare_content().
115
     * Max size limited by $prepare_content_cache_max_size.
116
     *
117
     * @var array
118
     * @see prepare_content()
119
     */
120
    private $prepare_content_cache;
121
122
    /** @var int Maximal size of $prepare_content_cache */
123
    private $prepare_content_cache_max_size;
124
125
    /**
126
     * @param string $user
127
     * @param string $password
128
     * @param string $vhost
129
     * @param bool $insist
130
     * @param string $login_method
131
     * @param null $login_response
132
     * @param string $locale
133
     * @param AbstractIO $io
134
     * @param int $heartbeat
135
     * @throws \Exception
136
     */
137 48
    public function __construct(
138
        $user,
139
        $password,
140
        $vhost = '/',
141
        $insist = false,
142
        $login_method = 'AMQPLAIN',
143
        $login_response = null,
144
        $locale = 'en_US',
145
        AbstractIO $io,
146
        $heartbeat = 0
147
    ) {
148
        // save the params for the use of __clone
149 48
        $this->construct_params = func_get_args();
150
151 48
        $this->wait_frame_reader = new AMQPReader(null);
152 48
        $this->vhost = $vhost;
153 48
        $this->insist = $insist;
154 48
        $this->login_method = $login_method;
155 48
        $this->login_response = $login_response;
156 48
        $this->locale = $locale;
157 48
        $this->io = $io;
158 48
        $this->heartbeat = $heartbeat;
159
160 48
        if ($user && $password) {
161 48
            $this->login_response = new AMQPWriter();
0 ignored issues
show
Documentation Bug introduced by
It seems like new \PhpAmqpLib\Wire\AMQPWriter() of type object<PhpAmqpLib\Wire\AMQPWriter> is incompatible with the declared type string of property $login_response.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
162 48
            $this->login_response->write_table(array(
163 48
                'LOGIN' => array('S', $user),
164 48
                'PASSWORD' => array('S', $password)
165 32
            ));
166
167
            // Skip the length
168 48
            $responseValue = $this->login_response->getvalue();
169 48
            $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
170
171 32
        } else {
172
            $this->login_response = null;
173
        }
174
175 48
        $this->prepare_content_cache = array();
176 48
        $this->prepare_content_cache_max_size = 100;
177
178
        // Lazy Connection waits on connecting
179 48
        if ($this->connectOnConstruct()) {
180 36
            $this->connect();
181 24
        }
182 48
    }
183
184
    /**
185
     * Connects to the AMQP server
186
     */
187 48
    protected function connect()
188
    {
189
        try {
190
            // Loop until we connect
191 48
            while (!$this->isConnected()) {
192
                // Assume we will connect, until we dont
193 48
                $this->setIsConnected(true);
194
195
                // Connect the socket
196 48
                $this->getIO()->connect();
197
198 48
                $this->channels = array();
199
                // The connection object itself is treated as channel 0
200 48
                parent::__construct($this, 0);
0 ignored issues
show
Comprehensibility Bug introduced by
It seems like you call parent on a different method (__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().

This check looks for a call to a parent method whose name is different than the method from which it is called.

Consider the following code:

class Daddy
{
    protected function getFirstName()
    {
        return "Eidur";
    }

    protected function getSurName()
    {
        return "Gudjohnsen";
    }
}

class Son
{
    public function getFirstName()
    {
        return parent::getSurname();
    }
}

The getFirstName() method in the Son calls the wrong method in the parent class.

Loading history...
201
202 48
                $this->input = new AMQPReader(null, $this->getIO());
203
204 48
                $this->write($this->amqp_protocol_header);
205 48
                $this->wait(array($this->waitHelper->get_wait('connection.start')));
206 48
                $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale);
207
208 48
                $this->wait_tune_ok = true;
209 48
                while ($this->wait_tune_ok) {
210 48
                    $this->wait(array(
211 48
                        $this->waitHelper->get_wait('connection.secure'),
212 48
                        $this->waitHelper->get_wait('connection.tune')
213 32
                    ));
214 32
                }
215
216 48
                $host = $this->x_open($this->vhost, '', $this->insist);
217 48
                if (!$host) {
218 48
                    return null; // we weren't redirected
219
                }
220
221
                $this->setIsConnected(false);
222
                $this->closeChannels();
223
224
                // we were redirected, close the socket, loop and try again
225
                $this->close_socket();
226
            }
227
228 8
        } catch (\Exception $e) {
229
            // Something went wrong, set the connection status
230
            $this->setIsConnected(false);
231
            $this->closeChannels();
232
            throw $e; // Rethrow exception
233
        }
234 12
    }
235
236
    /**
237
     * Reconnects using the original connection settings.
238
     * This will not recreate any channels that were established previously
239
     */
240 24
    public function reconnect()
241
    {
242
        // Try to close the AMQP connection
243 24
        $this->safeClose();
244
        // Reconnect the socket/stream then AMQP
245 24
        $this->getIO()->reconnect();
246 24
        $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure
247 24
        $this->connect();
248 24
    }
249
250
    /**
251
     * Cloning will use the old properties to make a new connection to the same server
252
     */
253
    public function __clone()
254
    {
255
        call_user_func_array(array($this, '__construct'), $this->construct_params);
256
    }
257
258
    public function __destruct()
259
    {
260
        if ($this->close_on_destruct) {
261
            $this->safeClose();
262
        }
263
    }
264
265
    /**
266
     * Attempts to close the connection safely
267
     */
268 24
    protected function safeClose()
269
    {
270
        try {
271 24
            if (isset($this->input) && $this->input) {
272 16
                $this->close();
273 8
            }
274 16
        } catch (\Exception $e) {
275
            // Nothing here
276
        }
277 24
    }
278
279
    /**
280
     * @param int $sec
281
     * @param int $usec
282
     * @return mixed
283
     */
284
    public function select($sec, $usec = 0)
285
    {
286
        return $this->getIO()->select($sec, $usec);
287
    }
288
289
    /**
290
     * Allows to not close the connection
291
     * it's useful after the fork when you don't want to close parent process connection
292
     *
293
     * @param bool $close
294
     */
295
    public function set_close_on_destruct($close = true)
296
    {
297
        $this->close_on_destruct = (bool) $close;
298
    }
299
300 48
    protected function close_input()
301
    {
302 48
        $this->debug->debug_msg('closing input');
303
304 48
        if (!is_null($this->input)) {
305 48
            $this->input->close();
306 48
            $this->input = null;
307 32
        }
308 48
    }
309
310 48
    protected function close_socket()
311
    {
312 48
        $this->debug->debug_msg('closing socket');
313
314 48
        if (!is_null($this->getIO())) {
315 48
            $this->getIO()->close();
316 32
        }
317 48
    }
318
319
    /**
320
     * @param $data
321
     */
322 48
    public function write($data)
323
    {
324 48
        $this->debug->debug_hexdump($data);
325
326
        try {
327 48
            $this->getIO()->write($data);
328 32
        } catch (AMQPRuntimeException $e) {
329
            $this->setIsConnected(false);
330
            throw $e;
331
        }
332 48
    }
333
334 48
    protected function do_close()
335
    {
336 48
        $this->setIsConnected(false);
337 48
        $this->close_input();
338 48
        $this->close_socket();
339 48
    }
340
341
    /**
342
     * @return int
343
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
344
     */
345 48
    public function get_free_channel_id()
346
    {
347 48
        for ($i = 1; $i <= $this->channel_max; $i++) {
348 48
            if (!isset($this->channels[$i])) {
349 48
                return $i;
350
            }
351 12
        }
352
353
        throw new AMQPRuntimeException('No free channel ids');
354
    }
355
356
    /**
357
     * @param string $channel
358
     * @param int $class_id
359
     * @param int $weight
360
     * @param int $body_size
361
     * @param string $packed_properties
362
     * @param string $body
363
     * @param AMQPWriter $pkt
364
     */
365 42
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
366
    {
367 42
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
368 42
        $this->write($pkt->getvalue());
0 ignored issues
show
Bug introduced by
It seems like $pkt is not always an object, but can also be of type null. Maybe add an additional type check?

If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe:

function someFunction(A $objectMaybe = null)
{
    if ($objectMaybe instanceof A) {
        $objectMaybe->doSomething();
    }
}
Loading history...
369 42
    }
370
371
    /**
372
     * Returns a new AMQPWriter or mutates the provided $pkt
373
     *
374
     * @param string $channel
375
     * @param int $class_id
376
     * @param int $weight
377
     * @param int $body_size
378
     * @param string $packed_properties
379
     * @param string $body
380
     * @param AMQPWriter $pkt
381
     * @return AMQPWriter
382
     */
383 42
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null)
384
    {
385 42
        $pkt = $pkt ?: new AMQPWriter();
386
387
        // Content already prepared ?
388 42
        $key_cache = sprintf(
389 42
            '%s|%s|%s|%s',
390 28
            $channel,
391 28
            $packed_properties,
392 28
            $class_id,
393
            $weight
394 28
        );
395
396 42
        if (!isset($this->prepare_content_cache[$key_cache])) {
397 42
            $w = new AMQPWriter();
398 42
            $w->write_octet(2);
399 42
            $w->write_short($channel);
400 42
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
401 42
            $w->write_short($class_id);
402 42
            $w->write_short($weight);
403 42
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
404 42
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
405
                reset($this->prepare_content_cache);
406
                $old_key = key($this->prepare_content_cache);
407
                unset($this->prepare_content_cache[$old_key]);
408
            }
409 28
        }
410 42
        $pkt->write($this->prepare_content_cache[$key_cache]);
411
412 42
        $pkt->write_longlong($body_size);
413 42
        $pkt->write($packed_properties);
414
415 42
        $pkt->write_octet(0xCE);
416
417
418
        // memory efficiency: walk the string instead of biting
419
        // it. good for very large packets (close in size to
420
        // memory_limit setting)
421 42
        $position = 0;
422 42
        $bodyLength = mb_strlen($body,'ASCII');
423 42
        while ($position < $bodyLength) {
424 36
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
425 36
            $position += $this->frame_max - 8;
426
427 36
            $pkt->write_octet(3);
428 36
            $pkt->write_short($channel);
429 36
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
430
431 36
            $pkt->write($payload);
432
433 36
            $pkt->write_octet(0xCE);
434 24
        }
435
436 42
        return $pkt;
437
    }
438
439
    /**
440
     * @param string $channel
441
     * @param $method_sig
442
     * @param string $args
443
     * @param null $pkt
444
     */
445 48
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
446
    {
447 48
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
448 48
        $this->write($pkt->getvalue());
449 48
        $this->debug->debug_method_signature1($method_sig);
450 48
    }
451
452
    /**
453
     * Returns a new AMQPWriter or mutates the provided $pkt
454
     *
455
     * @param string $channel
456
     * @param string[] $method_sig
457
     * @param string $args
458
     * @param AMQPWriter $pkt
459
     * @return AMQPWriter
460
     */
461 48
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
462
    {
463 48
        if ($args instanceof AMQPWriter) {
464 48
            $args = $args->getvalue();
465 32
        }
466
467 48
        $pkt = $pkt ?: new AMQPWriter();
468
469 48
        $pkt->write_octet(1);
470 48
        $pkt->write_short($channel);
471 48
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
472
        // in payload
473
474 48
        $pkt->write_short($method_sig[0]); // class_id
475 48
        $pkt->write_short($method_sig[1]); // method_id
476 48
        $pkt->write($args);
477
478 48
        $pkt->write_octet(0xCE);
479
480 48
        $this->debug->debug_method_signature1($method_sig);
481
482 48
        return $pkt;
483
    }
484
485
    /**
486
     * Waits for a frame from the server
487
     *
488
     * @param int $timeout
489
     * @return array
490
     * @throws \Exception
491
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
492
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
493
     */
494 48
    protected function wait_frame($timeout = 0)
495
    {
496 48
        if (is_null($this->input))
497 32
        {
498
            $this->setIsConnected(false);
499
            throw new AMQPRuntimeException('Broken pipe or closed connection');
500
        }
501
502 48
        $currentTimeout = $this->input->getTimeout();
503 48
        $this->input->setTimeout($timeout);
504
505
        try {
506
            // frame_type + channel_id + size
507 48
            $this->wait_frame_reader->reuse(
508 48
                $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG)
509 32
            );
510
511 48
            $frame_type = $this->wait_frame_reader->read_octet();
512 48
            $channel = $this->wait_frame_reader->read_short();
513 48
            $size = $this->wait_frame_reader->read_long();
514
515
            // payload + ch
516 48
            $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size));
517
518 48
            $payload = $this->wait_frame_reader->read($size);
519 48
            $ch = $this->wait_frame_reader->read_octet();
520
521 32
        } catch (AMQPTimeoutException $e) {
522
            $this->input->setTimeout($currentTimeout);
523
            throw $e;
524
        }
525
526 48
        $this->input->setTimeout($currentTimeout);
527
528 48
        if ($ch != 0xCE) {
529
            throw new AMQPRuntimeException(sprintf(
530
                'Framing error, unexpected byte: %x',
531
                $ch
532
            ));
533
        }
534
535 48
        return array($frame_type, $channel, $payload);
536
    }
537
538
    /**
539
     * Waits for a frame from the server destined for a particular channel.
540
     *
541
     * @param string $channel_id
542
     * @param int $timeout
543
     * @return array
544
     */
545 48
    protected function wait_channel($channel_id, $timeout = 0)
546
    {
547
        // Keeping the original timeout unchanged.
548 48
        $_timeout = $timeout;
549 48
        while (true) {
550 48
            $now = time();
551 48
            list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout);
552
553 48
            if ($frame_channel === 0 && $frame_type === 8) {
554
                // skip heartbeat frames and reduce the timeout by the time passed
555
                if($_timeout > 0) {
556
                    $_timeout -= time() - $now;
557
                    if($_timeout <= 0) {
558
                        // If timeout has been reached, throw the exception without calling wait_frame
559
                        throw new AMQPTimeoutException("Timeout waiting on channel");
560
                    }
561
                }
562
                continue;
563
564
            } else {
565
566 48
                if ($frame_channel == $channel_id) {
567 48
                    return array($frame_type, $payload);
568
                }
569
570
                // Not the channel we were looking for.  Queue this frame
571
                //for later, when the other channel is looking for frames.
572
                // Make sure the channel still exists, it could have been
573
                // closed by a previous Exception.
574 6
                if (isset($this->channels[$frame_channel])) {
575 6
                    array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload));
0 ignored issues
show
Bug introduced by
The property frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.

This check looks for access to properties that are not accessible from the current context.

If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class.

Loading history...
576 4
                }
577
578
                // If we just queued up a method for channel 0 (the Connection
579
                // itself) it's probably a close method in reaction to some
580
                // error, so deal with it right away.
581 6
                if (($frame_type == 1) && ($frame_channel == 0)) {
582
                    $this->wait();
583
                }
584
            }
585 4
        }
586
    }
587
588
    /**
589
     * Fetches a channel object identified by the numeric channel_id, or
590
     * create that object if it doesn't already exist.
591
     *
592
     * @param string $channel_id
593
     * @return AMQPChannel
594
     */
595 48
    public function channel($channel_id = null)
596
    {
597 48
        if (isset($this->channels[$channel_id])) {
598
            return $this->channels[$channel_id];
599
        }
600
601 48
        $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
602 48
        $ch = new AMQPChannel($this->connection, $channel_id);
603 48
        $this->channels[$channel_id] = $ch;
604
605 48
        return $ch;
606
    }
607
608
    /**
609
     * Requests a connection close
610
     *
611
     * @param int $reply_code
612
     * @param string $reply_text
613
     * @param array $method_sig
614
     * @return mixed|null
615
     */
616 48
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
617
    {
618 48
        if ($this->io instanceof StreamIO)
619 32
        {
620 36
            $this->io->disableHeartbeat();
621 24
        }
622
623 48
        if (!$this->protocolWriter || !$this->isConnected()) {
624
            return null;
625
        }
626
627 48
        $this->closeChannels();
628
629 48
        list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
630 32
            $reply_code,
631 32
            $reply_text,
632 48
            $method_sig[0],
633 48
            $method_sig[1]
634 32
        );
635 48
        $this->send_method_frame(array($class_id, $method_id), $args);
636
637 48
        $this->setIsConnected(false);
638
639 48
        return $this->wait(array(
640 48
            $this->waitHelper->get_wait('connection.close_ok')
641 32
        ));
642
    }
643
644
    /**
645
     * @param AMQPReader $reader
646
     * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException
647
     */
648
    protected function connection_close(AMQPReader $reader)
649
    {
650
        $reply_code = $reader->read_short();
651
        $reply_text = $reader->read_shortstr();
652
        $class_id = $reader->read_short();
653
        $method_id = $reader->read_short();
654
655
        $this->x_close_ok();
656
657
        throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id));
0 ignored issues
show
Documentation introduced by
array($class_id, $method_id) is of type array<integer,*,{"0":"*","1":"*"}>, but the function expects a object<Exception>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
658
    }
659
660
    /**
661
     * Confirms a connection close
662
     */
663
    protected function x_close_ok()
664
    {
665
        $this->send_method_frame(
666
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
667
        );
668
        $this->do_close();
669
    }
670
671
    /**
672
     * Confirm a connection close
673
     */
674 48
    protected function connection_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
675
    {
676 48
        $this->do_close();
677 48
    }
678
679
    /**
680
     * @param string $virtual_host
681
     * @param string $capabilities
682
     * @param bool $insist
683
     * @return mixed
684
     */
685 48
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
686
    {
687 48
        $args = new AMQPWriter();
688 48
        $args->write_shortstr($virtual_host);
689 48
        $args->write_shortstr($capabilities);
690 48
        $args->write_bits(array($insist));
691 48
        $this->send_method_frame(array(10, 40), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
692
693
        $wait = array(
694 48
            $this->waitHelper->get_wait('connection.open_ok')
695 32
        );
696
697 48
        if ($this->protocolVersion == '0.8') {
698
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
699
        }
700
701 48
        return $this->wait($wait);
702
    }
703
704
    /**
705
     * Signals that the connection is ready
706
     *
707
     * @param AMQPReader $args
708
     */
709 48
    protected function connection_open_ok($args)
710
    {
711 48
        $this->known_hosts = $args->read_shortstr();
712 48
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
713 48
    }
714
715
    /**
716
     * Asks the client to use a different server
717
     *
718
     * @param AMQPReader $args
719
     * @return string
720
     */
721
    protected function connection_redirect($args)
722
    {
723
        $host = $args->read_shortstr();
724
        $this->known_hosts = $args->read_shortstr();
725
        $this->debug->debug_msg(sprintf(
726
                'Redirected to [%s], known_hosts [%s]',
727
                $host,
728
                $this->known_hosts
729
            ));
730
731
        return $host;
732
    }
733
734
    /**
735
     * Security mechanism challenge
736
     *
737
     * @param AMQPReader $args
738
     */
739
    protected function connection_secure($args)
740
    {
741
        $challenge = $args->read_longstr();
0 ignored issues
show
Unused Code introduced by
$challenge is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
742
    }
743
744
    /**
745
     * Security mechanism response
746
     */
747
    protected function x_secure_ok($response)
748
    {
749
        $args = new AMQPWriter();
750
        $args->write_longstr($response);
751
        $this->send_method_frame(array(10, 21), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
752
    }
753
754
    /**
755
     * Starts connection negotiation
756
     *
757
     * @param AMQPReader $args
758
     */
759 48
    protected function connection_start($args)
760
    {
761 48
        $this->version_major = $args->read_octet();
762 48
        $this->version_minor = $args->read_octet();
763 48
        $this->server_properties = $args->read_table();
0 ignored issues
show
Documentation Bug introduced by
It seems like $args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
764 48
        $this->mechanisms = explode(' ', $args->read_longstr());
765 48
        $this->locales = explode(' ', $args->read_longstr());
766
767 48
        $this->debug->debug_connection_start(
768 48
            $this->version_major,
769 48
            $this->version_minor,
770 48
            $this->server_properties,
771 48
            $this->mechanisms,
772 48
            $this->locales
773 32
        );
774 48
    }
775
776
    /**
777
     * @param AMQPTable|array $clientProperties
778
     * @param string $mechanism
779
     * @param string $response
780
     * @param string $locale
781
     */
782 48
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
783
    {
784 48
        $args = new AMQPWriter();
785 48
        $args->write_table($clientProperties);
786 48
        $args->write_shortstr($mechanism);
787 48
        $args->write_longstr($response);
788 48
        $args->write_shortstr($locale);
789 48
        $this->send_method_frame(array(10, 11), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
790 48
    }
791
792
    /**
793
     * Proposes connection tuning parameters
794
     *
795
     * @param AMQPReader $args
796
     */
797 48
    protected function connection_tune($args)
798
    {
799 48
        $v = $args->read_short();
800 48
        if ($v) {
801
            $this->channel_max = $v;
802
        }
803
804 48
        $v = $args->read_long();
805 48
        if ($v) {
806 48
            $this->frame_max = $v;
0 ignored issues
show
Documentation Bug introduced by
The property $frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
807 32
        }
808
809
        // use server proposed value if not set
810 48
        if ($this->heartbeat === null) {
811
            $this->heartbeat = $args->read_short();
812
        }
813
814 48
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
815 48
    }
816
817
    /**
818
     * Negotiates connection tuning parameters
819
     *
820
     * @param $channel_max
821
     * @param $frame_max
822
     * @param $heartbeat
823
     */
824 48
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
825
    {
826 48
        $args = new AMQPWriter();
827 48
        $args->write_short($channel_max);
828 48
        $args->write_long($frame_max);
829 48
        $args->write_short($heartbeat);
830 48
        $this->send_method_frame(array(10, 31), $args);
0 ignored issues
show
Documentation introduced by
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
831 48
        $this->wait_tune_ok = false;
832 48
    }
833
834
    /**
835
     * @return SocketIO
836
     */
837
    public function getSocket()
838
    {
839
        return $this->io->getSocket();
840
    }
841
842
    /**
843
     * @return \PhpAmqpLib\Wire\IO\AbstractIO
844
     */
845 36
    protected function getIO()
846
    {
847 36
        return $this->io;
848
    }
849
850
    /**
851
     * Handles connection blocked notifications
852
     *
853
     * @param AMQPReader $args
854
     */
855
    protected function connection_blocked(AMQPReader $args)
856
    {
857
        // Call the block handler and pass in the reason
858
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
859
    }
860
861
    /**
862
     * Handles connection unblocked notifications
863
     */
864
    protected function connection_unblocked(AMQPReader $args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
865
    {
866
        // No args to an unblock event
867
        $this->dispatch_to_handler($this->connection_unblock_handler, array());
868
    }
869
870
    /**
871
     * Sets a handler which is called whenever a connection.block is sent from the server
872
     *
873
     * @param callable $callback
874
     */
875
    public function set_connection_block_handler($callback)
876
    {
877
        $this->connection_block_handler = $callback;
878
    }
879
880
    /**
881
     * Sets a handler which is called whenever a connection.block is sent from the server
882
     *
883
     * @param callable $callback
884
     */
885
    public function set_connection_unblock_handler($callback)
886
    {
887
        $this->connection_unblock_handler = $callback;
888
    }
889
890
    /**
891
     * Gets the connection status
892
     *
893
     * @return bool
894
     */
895 48
    public function isConnected()
896
    {
897 48
        return (bool) $this->is_connected;
898
    }
899
900
    /**
901
     * Set the connection status
902
     *
903
     * @param bool $is_connected
904
     */
905 48
    protected function setIsConnected($is_connected)
906
    {
907 48
        $this->is_connected = (bool) $is_connected;
908 48
    }
909
910
    /**
911
     * Closes all available channels
912
     */
913 48
    protected function closeChannels()
914
    {
915 48
        foreach ($this->channels as $key => $channel) {
916
            // channels[0] is this connection object, so don't close it yet
917 48
            if ($key === 0) {
918 48
                continue;
919
            }
920
            try {
921 30
                $channel->close();
922 30
            } catch (\Exception $e) {
923
                /* Ignore closing errors */
924
            }
925 32
        }
926 48
    }
927
928
    /**
929
     * Should the connection be attempted during construction?
930
     *
931
     * @return bool
932
     */
933 36
    public function connectOnConstruct()
934
    {
935 36
        return true;
936
    }
937
}
938