videlalvaro /
php-amqplib
This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php |
||
| 2 | namespace PhpAmqpLib\Connection; |
||
| 3 | |||
| 4 | use PhpAmqpLib\Channel\AbstractChannel; |
||
| 5 | use PhpAmqpLib\Channel\AMQPChannel; |
||
| 6 | use PhpAmqpLib\Exception\AMQPProtocolConnectionException; |
||
| 7 | use PhpAmqpLib\Exception\AMQPRuntimeException; |
||
| 8 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||
| 9 | use PhpAmqpLib\Wire\AMQPReader; |
||
| 10 | use PhpAmqpLib\Wire\AMQPWriter; |
||
| 11 | use PhpAmqpLib\Wire\IO\AbstractIO; |
||
| 12 | use PhpAmqpLib\Wire\IO\SocketIO; |
||
| 13 | |||
| 14 | class AbstractConnection extends AbstractChannel |
||
| 15 | { |
||
| 16 | /** @var array */ |
||
| 17 | public static $LIBRARY_PROPERTIES = array( |
||
| 18 | 'product' => array('S', 'AMQPLib'), |
||
| 19 | 'platform' => array('S', 'PHP'), |
||
| 20 | 'version' => array('S', '2.4'), |
||
| 21 | 'information' => array('S', ''), |
||
| 22 | 'copyright' => array('S', ''), |
||
| 23 | 'capabilities' => array( |
||
| 24 | 'F', |
||
| 25 | array( |
||
| 26 | 'authentication_failure_close' => array('t', true), |
||
| 27 | 'publisher_confirms' => array('t', true), |
||
| 28 | 'consumer_cancel_notify' => array('t', true), |
||
| 29 | 'exchange_exchange_bindings' => array('t', true), |
||
| 30 | 'basic.nack' => array('t', true), |
||
| 31 | 'connection.blocked' => array('t', true) |
||
| 32 | ) |
||
| 33 | ) |
||
| 34 | ); |
||
| 35 | |||
| 36 | /** @var AMQPChannel[] */ |
||
| 37 | public $channels = array(); |
||
| 38 | |||
| 39 | /** @var int */ |
||
| 40 | protected $version_major; |
||
| 41 | |||
| 42 | /** @var int */ |
||
| 43 | protected $version_minor; |
||
| 44 | |||
| 45 | /** @var array */ |
||
| 46 | protected $server_properties; |
||
| 47 | |||
| 48 | /** @var array */ |
||
| 49 | protected $mechanisms; |
||
| 50 | |||
| 51 | /** @var array */ |
||
| 52 | protected $locales; |
||
| 53 | |||
| 54 | /** @var bool */ |
||
| 55 | protected $wait_tune_ok; |
||
| 56 | |||
| 57 | /** @var string */ |
||
| 58 | protected $known_hosts; |
||
| 59 | |||
| 60 | /** @var AMQPReader */ |
||
| 61 | protected $input; |
||
| 62 | |||
| 63 | /** @var string */ |
||
| 64 | protected $vhost; |
||
| 65 | |||
| 66 | /** @var bool */ |
||
| 67 | protected $insist; |
||
| 68 | |||
| 69 | /** @var string */ |
||
| 70 | protected $login_method; |
||
| 71 | |||
| 72 | /** @var AMQPWriter */ |
||
| 73 | protected $login_response; |
||
| 74 | |||
| 75 | /** @var string */ |
||
| 76 | protected $locale; |
||
| 77 | |||
| 78 | /** @var int */ |
||
| 79 | protected $heartbeat; |
||
| 80 | |||
| 81 | /** @var SocketIO */ |
||
| 82 | protected $sock; |
||
| 83 | |||
| 84 | /** @var int */ |
||
| 85 | protected $channel_max = 65535; |
||
| 86 | |||
| 87 | /** @var int */ |
||
| 88 | protected $frame_max = 131072; |
||
| 89 | |||
| 90 | /** @var array Constructor parameters for clone */ |
||
| 91 | protected $construct_params; |
||
| 92 | |||
| 93 | /** @var bool Close the connection in destructor */ |
||
| 94 | protected $close_on_destruct = true; |
||
| 95 | |||
| 96 | /** @var bool Maintain connection status */ |
||
| 97 | protected $is_connected = false; |
||
| 98 | |||
| 99 | /** @var \PhpAmqpLib\Wire\IO\AbstractIO */ |
||
| 100 | protected $io; |
||
| 101 | |||
| 102 | /** @var \PhpAmqpLib\Wire\AMQPReader */ |
||
| 103 | protected $wait_frame_reader; |
||
| 104 | |||
| 105 | /** @var callable Handles connection blocking from the server */ |
||
| 106 | private $connection_block_handler; |
||
| 107 | |||
| 108 | /** @var callable Handles connection unblocking from the server */ |
||
| 109 | private $connection_unblock_handler; |
||
| 110 | |||
| 111 | /** |
||
| 112 | * Circular buffer to speed up prepare_content(). |
||
| 113 | * Max size limited by $prepare_content_cache_max_size. |
||
| 114 | * |
||
| 115 | * @var array |
||
| 116 | * @see prepare_content() |
||
| 117 | */ |
||
| 118 | private $prepare_content_cache; |
||
| 119 | |||
| 120 | /** @var int Maximal size of $prepare_content_cache */ |
||
| 121 | private $prepare_content_cache_max_size; |
||
| 122 | |||
| 123 | /** |
||
| 124 | * @param string $user |
||
| 125 | * @param string $password |
||
| 126 | * @param string $vhost |
||
| 127 | * @param bool $insist |
||
| 128 | * @param string $login_method |
||
| 129 | * @param null $login_response |
||
| 130 | * @param string $locale |
||
| 131 | * @param AbstractIO $io |
||
| 132 | * @param int $heartbeat |
||
| 133 | * @throws \Exception |
||
| 134 | */ |
||
| 135 | 60 | public function __construct( |
|
| 136 | $user, |
||
| 137 | $password, |
||
| 138 | $vhost = '/', |
||
| 139 | $insist = false, |
||
| 140 | $login_method = 'AMQPLAIN', |
||
| 141 | $login_response = null, |
||
| 142 | $locale = 'en_US', |
||
| 143 | AbstractIO $io, |
||
| 144 | $heartbeat = 0 |
||
| 145 | ) { |
||
| 146 | // save the params for the use of __clone |
||
| 147 | 60 | $this->construct_params = func_get_args(); |
|
| 148 | |||
| 149 | 60 | $this->wait_frame_reader = new AMQPReader(null); |
|
| 150 | 60 | $this->vhost = $vhost; |
|
| 151 | 60 | $this->insist = $insist; |
|
| 152 | 60 | $this->login_method = $login_method; |
|
| 153 | 60 | $this->login_response = $login_response; |
|
| 154 | 60 | $this->locale = $locale; |
|
| 155 | 60 | $this->io = $io; |
|
| 156 | 60 | $this->heartbeat = $heartbeat; |
|
| 157 | |||
| 158 | 60 | if ($user && $password) { |
|
| 159 | 60 | $this->login_response = new AMQPWriter(); |
|
| 160 | 60 | $this->login_response->write_table(array( |
|
| 161 | 60 | 'LOGIN' => array('S', $user), |
|
| 162 | 60 | 'PASSWORD' => array('S', $password) |
|
| 163 | 48 | )); |
|
| 164 | |||
| 165 | // Skip the length |
||
| 166 | 60 | $responseValue = $this->login_response->getvalue(); |
|
| 167 | 60 | $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII'); |
|
|
0 ignored issues
–
show
|
|||
| 168 | |||
| 169 | 48 | } else { |
|
| 170 | $this->login_response = null; |
||
| 171 | } |
||
| 172 | |||
| 173 | 60 | $this->prepare_content_cache = array(); |
|
| 174 | 60 | $this->prepare_content_cache_max_size = 100; |
|
| 175 | |||
| 176 | // Lazy Connection waits on connecting |
||
| 177 | 60 | if ($this->connectOnConstruct()) { |
|
| 178 | 50 | $this->connect(); |
|
| 179 | 40 | } |
|
| 180 | 60 | } |
|
| 181 | |||
| 182 | /** |
||
| 183 | * Connects to the AMQP server |
||
| 184 | */ |
||
| 185 | 60 | protected function connect() |
|
| 186 | { |
||
| 187 | try { |
||
| 188 | // Loop until we connect |
||
| 189 | 60 | while (!$this->isConnected()) { |
|
| 190 | // Assume we will connect, until we dont |
||
| 191 | 60 | $this->setIsConnected(true); |
|
| 192 | |||
| 193 | // Connect the socket |
||
| 194 | 60 | $this->getIO()->connect(); |
|
| 195 | |||
| 196 | 60 | $this->channels = array(); |
|
| 197 | // The connection object itself is treated as channel 0 |
||
| 198 | 60 | parent::__construct($this, 0); |
|
|
0 ignored issues
–
show
It seems like you call parent on a different method (
__construct() instead of connect()). Are you sure this is correct? If so, you might want to change this to $this->__construct().
This check looks for a call to a parent method whose name is different than the method from which it is called. Consider the following code: class Daddy
{
protected function getFirstName()
{
return "Eidur";
}
protected function getSurName()
{
return "Gudjohnsen";
}
}
class Son
{
public function getFirstName()
{
return parent::getSurname();
}
}
The Loading history...
|
|||
| 199 | |||
| 200 | 60 | $this->input = new AMQPReader(null, $this->getIO()); |
|
| 201 | |||
| 202 | 60 | $this->write($this->amqp_protocol_header); |
|
| 203 | 60 | $this->wait(array($this->waitHelper->get_wait('connection.start'))); |
|
| 204 | 60 | $this->x_start_ok(self::$LIBRARY_PROPERTIES, $this->login_method, $this->login_response, $this->locale); |
|
| 205 | |||
| 206 | 60 | $this->wait_tune_ok = true; |
|
| 207 | 60 | while ($this->wait_tune_ok) { |
|
| 208 | 60 | $this->wait(array( |
|
| 209 | 60 | $this->waitHelper->get_wait('connection.secure'), |
|
| 210 | 60 | $this->waitHelper->get_wait('connection.tune') |
|
| 211 | 48 | )); |
|
| 212 | 48 | } |
|
| 213 | |||
| 214 | 60 | $host = $this->x_open($this->vhost, '', $this->insist); |
|
| 215 | 60 | if (!$host) { |
|
| 216 | 60 | return null; // we weren't redirected |
|
| 217 | } |
||
| 218 | |||
| 219 | $this->setIsConnected(false); |
||
| 220 | $this->closeChannels(); |
||
| 221 | |||
| 222 | // we were redirected, close the socket, loop and try again |
||
| 223 | $this->close_socket(); |
||
| 224 | } |
||
| 225 | |||
| 226 | 8 | } catch (\Exception $e) { |
|
| 227 | // Something went wrong, set the connection status |
||
| 228 | $this->setIsConnected(false); |
||
| 229 | $this->closeChannels(); |
||
| 230 | throw $e; // Rethrow exception |
||
| 231 | } |
||
| 232 | 10 | } |
|
| 233 | |||
| 234 | /** |
||
| 235 | * Reconnects using the original connection settings. |
||
| 236 | * This will not recreate any channels that were established previously |
||
| 237 | */ |
||
| 238 | 20 | public function reconnect() |
|
| 239 | { |
||
| 240 | // Try to close the AMQP connection |
||
| 241 | 20 | $this->safeClose(); |
|
| 242 | // Reconnect the socket/stream then AMQP |
||
| 243 | 20 | $this->getIO()->reconnect(); |
|
| 244 | 20 | $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure |
|
| 245 | 20 | $this->connect(); |
|
| 246 | 20 | } |
|
| 247 | |||
| 248 | /** |
||
| 249 | * Cloning will use the old properties to make a new connection to the same server |
||
| 250 | */ |
||
| 251 | public function __clone() |
||
| 252 | { |
||
| 253 | call_user_func_array(array($this, '__construct'), $this->construct_params); |
||
| 254 | } |
||
| 255 | |||
| 256 | public function __destruct() |
||
| 257 | { |
||
| 258 | if ($this->close_on_destruct) { |
||
| 259 | $this->safeClose(); |
||
| 260 | } |
||
| 261 | } |
||
| 262 | |||
| 263 | /** |
||
| 264 | * Attempts to close the connection safely |
||
| 265 | */ |
||
| 266 | 20 | protected function safeClose() |
|
| 267 | { |
||
| 268 | try { |
||
| 269 | 20 | if (isset($this->input) && $this->input) { |
|
| 270 | 12 | $this->close(); |
|
| 271 | 8 | } |
|
| 272 | 16 | } catch (\Exception $e) { |
|
| 273 | // Nothing here |
||
| 274 | } |
||
| 275 | 20 | } |
|
| 276 | |||
| 277 | /** |
||
| 278 | * @param int $sec |
||
| 279 | * @param int $usec |
||
| 280 | * @return mixed |
||
| 281 | */ |
||
| 282 | public function select($sec, $usec = 0) |
||
| 283 | { |
||
| 284 | return $this->getIO()->select($sec, $usec); |
||
| 285 | } |
||
| 286 | |||
| 287 | /** |
||
| 288 | * Allows to not close the connection |
||
| 289 | * it's useful after the fork when you don't want to close parent process connection |
||
| 290 | * |
||
| 291 | * @param bool $close |
||
| 292 | */ |
||
| 293 | public function set_close_on_destruct($close = true) |
||
| 294 | { |
||
| 295 | $this->close_on_destruct = (bool) $close; |
||
| 296 | } |
||
| 297 | |||
| 298 | 60 | protected function close_input() |
|
| 299 | { |
||
| 300 | 60 | $this->debug->debug_msg('closing input'); |
|
| 301 | |||
| 302 | 60 | if (!is_null($this->input)) { |
|
| 303 | 60 | $this->input->close(); |
|
| 304 | 60 | $this->input = null; |
|
| 305 | 48 | } |
|
| 306 | 60 | } |
|
| 307 | |||
| 308 | 60 | protected function close_socket() |
|
| 309 | { |
||
| 310 | 60 | $this->debug->debug_msg('closing socket'); |
|
| 311 | |||
| 312 | 60 | if (!is_null($this->getIO())) { |
|
| 313 | 60 | $this->getIO()->close(); |
|
| 314 | 48 | } |
|
| 315 | 60 | } |
|
| 316 | |||
| 317 | /** |
||
| 318 | * @param $data |
||
| 319 | */ |
||
| 320 | 60 | public function write($data) |
|
| 321 | { |
||
| 322 | 60 | $this->debug->debug_hexdump($data); |
|
| 323 | |||
| 324 | try { |
||
| 325 | 60 | $this->getIO()->write($data); |
|
| 326 | 48 | } catch (AMQPRuntimeException $e) { |
|
| 327 | $this->setIsConnected(false); |
||
| 328 | throw $e; |
||
| 329 | } |
||
| 330 | 60 | } |
|
| 331 | |||
| 332 | 60 | protected function do_close() |
|
| 333 | { |
||
| 334 | 60 | $this->setIsConnected(false); |
|
| 335 | 60 | $this->close_input(); |
|
| 336 | 60 | $this->close_socket(); |
|
| 337 | 60 | } |
|
| 338 | |||
| 339 | /** |
||
| 340 | * @return int |
||
| 341 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
| 342 | */ |
||
| 343 | 60 | public function get_free_channel_id() |
|
| 344 | { |
||
| 345 | 60 | for ($i = 1; $i <= $this->channel_max; $i++) { |
|
| 346 | 60 | if (!isset($this->channels[$i])) { |
|
| 347 | 60 | return $i; |
|
| 348 | } |
||
| 349 | 12 | } |
|
| 350 | |||
| 351 | throw new AMQPRuntimeException('No free channel ids'); |
||
| 352 | } |
||
| 353 | |||
| 354 | /** |
||
| 355 | * @param string $channel |
||
| 356 | * @param int $class_id |
||
| 357 | * @param int $weight |
||
| 358 | * @param int $body_size |
||
| 359 | * @param string $packed_properties |
||
| 360 | * @param string $body |
||
| 361 | * @param AMQPWriter $pkt |
||
| 362 | */ |
||
| 363 | 55 | public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) |
|
| 364 | { |
||
| 365 | 55 | $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt); |
|
| 366 | 55 | $this->write($pkt->getvalue()); |
|
|
0 ignored issues
–
show
It seems like
$pkt is not always an object, but can also be of type null. Maybe add an additional type check?
If a variable is not always an object, we recommend to add an additional type check to ensure your method call is safe: function someFunction(A $objectMaybe = null)
{
if ($objectMaybe instanceof A) {
$objectMaybe->doSomething();
}
}
Loading history...
|
|||
| 367 | 55 | } |
|
| 368 | |||
| 369 | /** |
||
| 370 | * Returns a new AMQPWriter or mutates the provided $pkt |
||
| 371 | * |
||
| 372 | * @param string $channel |
||
| 373 | * @param int $class_id |
||
| 374 | * @param int $weight |
||
| 375 | * @param int $body_size |
||
| 376 | * @param string $packed_properties |
||
| 377 | * @param string $body |
||
| 378 | * @param AMQPWriter $pkt |
||
| 379 | * @return AMQPWriter |
||
| 380 | */ |
||
| 381 | 55 | public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) |
|
| 382 | { |
||
| 383 | 55 | $pkt = $pkt ?: new AMQPWriter(); |
|
| 384 | |||
| 385 | // Content already prepared ? |
||
| 386 | 55 | $key_cache = sprintf( |
|
| 387 | 55 | '%s|%s|%s|%s', |
|
| 388 | 44 | $channel, |
|
| 389 | 44 | $packed_properties, |
|
| 390 | 44 | $class_id, |
|
| 391 | $weight |
||
| 392 | 44 | ); |
|
| 393 | |||
| 394 | 55 | if (!isset($this->prepare_content_cache[$key_cache])) { |
|
| 395 | 55 | $w = new AMQPWriter(); |
|
| 396 | 55 | $w->write_octet(2); |
|
| 397 | 55 | $w->write_short($channel); |
|
| 398 | 55 | $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12); |
|
| 399 | 55 | $w->write_short($class_id); |
|
| 400 | 55 | $w->write_short($weight); |
|
| 401 | 55 | $this->prepare_content_cache[$key_cache] = $w->getvalue(); |
|
| 402 | 55 | if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) { |
|
| 403 | reset($this->prepare_content_cache); |
||
| 404 | $old_key = key($this->prepare_content_cache); |
||
| 405 | unset($this->prepare_content_cache[$old_key]); |
||
| 406 | } |
||
| 407 | 44 | } |
|
| 408 | 55 | $pkt->write($this->prepare_content_cache[$key_cache]); |
|
| 409 | |||
| 410 | 55 | $pkt->write_longlong($body_size); |
|
| 411 | 55 | $pkt->write($packed_properties); |
|
| 412 | |||
| 413 | 55 | $pkt->write_octet(0xCE); |
|
| 414 | |||
| 415 | |||
| 416 | // memory efficiency: walk the string instead of biting |
||
| 417 | // it. good for very large packets (close in size to |
||
| 418 | // memory_limit setting) |
||
| 419 | 55 | $position = 0; |
|
| 420 | 55 | $bodyLength = mb_strlen($body,'ASCII'); |
|
| 421 | 55 | while ($position < $bodyLength) { |
|
| 422 | 50 | $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII'); |
|
| 423 | 50 | $position += $this->frame_max - 8; |
|
| 424 | |||
| 425 | 50 | $pkt->write_octet(3); |
|
| 426 | 50 | $pkt->write_short($channel); |
|
| 427 | 50 | $pkt->write_long(mb_strlen($payload, 'ASCII')); |
|
| 428 | |||
| 429 | 50 | $pkt->write($payload); |
|
| 430 | |||
| 431 | 50 | $pkt->write_octet(0xCE); |
|
| 432 | 40 | } |
|
| 433 | |||
| 434 | 55 | return $pkt; |
|
| 435 | } |
||
| 436 | |||
| 437 | /** |
||
| 438 | * @param $channel |
||
| 439 | * @param $method_sig |
||
| 440 | * @param string $args |
||
| 441 | * @param null $pkt |
||
| 442 | */ |
||
| 443 | 60 | protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) |
|
| 444 | { |
||
| 445 | 60 | $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt); |
|
| 446 | 60 | $this->write($pkt->getvalue()); |
|
| 447 | 60 | $this->debug->debug_method_signature1($method_sig); |
|
| 448 | 60 | } |
|
| 449 | |||
| 450 | /** |
||
| 451 | * Returns a new AMQPWriter or mutates the provided $pkt |
||
| 452 | * |
||
| 453 | * @param $channel |
||
| 454 | * @param $method_sig |
||
| 455 | * @param string $args |
||
| 456 | * @param AMQPWriter $pkt |
||
| 457 | * @return null|AMQPWriter |
||
| 458 | */ |
||
| 459 | 60 | protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) |
|
| 460 | { |
||
| 461 | 60 | if ($args instanceof AMQPWriter) { |
|
| 462 | 60 | $args = $args->getvalue(); |
|
| 463 | 48 | } |
|
| 464 | |||
| 465 | 60 | $pkt = $pkt ?: new AMQPWriter(); |
|
| 466 | |||
| 467 | 60 | $pkt->write_octet(1); |
|
| 468 | 60 | $pkt->write_short($channel); |
|
| 469 | 60 | $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id |
|
| 470 | // in payload |
||
| 471 | |||
| 472 | 60 | $pkt->write_short($method_sig[0]); // class_id |
|
| 473 | 60 | $pkt->write_short($method_sig[1]); // method_id |
|
| 474 | 60 | $pkt->write($args); |
|
| 475 | |||
| 476 | 60 | $pkt->write_octet(0xCE); |
|
| 477 | |||
| 478 | 60 | $this->debug->debug_method_signature1($method_sig); |
|
| 479 | |||
| 480 | 60 | return $pkt; |
|
| 481 | } |
||
| 482 | |||
| 483 | /** |
||
| 484 | * Waits for a frame from the server |
||
| 485 | * |
||
| 486 | * @param int $timeout |
||
| 487 | * @return array |
||
| 488 | * @throws \Exception |
||
| 489 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
| 490 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
| 491 | */ |
||
| 492 | 60 | protected function wait_frame($timeout = 0) |
|
| 493 | { |
||
| 494 | 60 | if (is_null($this->input)) |
|
| 495 | 48 | { |
|
| 496 | $this->setIsConnected(false); |
||
| 497 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
| 498 | } |
||
| 499 | |||
| 500 | 60 | $currentTimeout = $this->input->getTimeout(); |
|
| 501 | 60 | $this->input->setTimeout($timeout); |
|
| 502 | |||
| 503 | try { |
||
| 504 | // frame_type + channel_id + size |
||
| 505 | 60 | $this->wait_frame_reader->reuse( |
|
| 506 | 60 | $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG) |
|
| 507 | 48 | ); |
|
| 508 | |||
| 509 | 60 | $frame_type = $this->wait_frame_reader->read_octet(); |
|
| 510 | 60 | $channel = $this->wait_frame_reader->read_short(); |
|
| 511 | 60 | $size = $this->wait_frame_reader->read_long(); |
|
| 512 | |||
| 513 | // payload + ch |
||
| 514 | 60 | $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size)); |
|
| 515 | |||
| 516 | 60 | $payload = $this->wait_frame_reader->read($size); |
|
| 517 | 60 | $ch = $this->wait_frame_reader->read_octet(); |
|
| 518 | |||
| 519 | 48 | } catch (AMQPTimeoutException $e) { |
|
| 520 | $this->input->setTimeout($currentTimeout); |
||
| 521 | throw $e; |
||
| 522 | } |
||
| 523 | |||
| 524 | 60 | $this->input->setTimeout($currentTimeout); |
|
| 525 | |||
| 526 | 60 | if ($ch != 0xCE) { |
|
| 527 | throw new AMQPRuntimeException(sprintf( |
||
| 528 | 'Framing error, unexpected byte: %x', |
||
| 529 | $ch |
||
| 530 | )); |
||
| 531 | } |
||
| 532 | |||
| 533 | 60 | return array($frame_type, $channel, $payload); |
|
| 534 | } |
||
| 535 | |||
| 536 | /** |
||
| 537 | * Waits for a frame from the server destined for a particular channel. |
||
| 538 | * |
||
| 539 | * @param string $channel_id |
||
| 540 | * @param int $timeout |
||
| 541 | * @return array |
||
| 542 | */ |
||
| 543 | 60 | protected function wait_channel($channel_id, $timeout = 0) |
|
| 544 | { |
||
| 545 | // Keeping the original timeout unchanged. |
||
| 546 | 60 | $_timeout = $timeout; |
|
| 547 | 60 | while (true) { |
|
| 548 | 60 | $now = time(); |
|
| 549 | 60 | list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout); |
|
| 550 | |||
| 551 | 60 | if ($frame_channel === 0 && $frame_type === 8) { |
|
| 552 | // skip heartbeat frames and reduce the timeout by the time passed |
||
| 553 | if($_timeout > 0) { |
||
| 554 | $_timeout -= time() - $now; |
||
| 555 | if($_timeout <= 0) { |
||
| 556 | // If timeout has been reached, throw the exception without calling wait_frame |
||
| 557 | throw new AMQPTimeoutException("Timeout waiting on channel"); |
||
| 558 | } |
||
| 559 | } |
||
| 560 | continue; |
||
| 561 | |||
| 562 | } else { |
||
| 563 | |||
| 564 | 60 | if ($frame_channel == $channel_id) { |
|
| 565 | 60 | return array($frame_type, $payload); |
|
| 566 | } |
||
| 567 | |||
| 568 | // Not the channel we were looking for. Queue this frame |
||
| 569 | //for later, when the other channel is looking for frames. |
||
| 570 | // Make sure the channel still exists, it could have been |
||
| 571 | // closed by a previous Exception. |
||
| 572 | 5 | if (isset($this->channels[$frame_channel])) { |
|
| 573 | 5 | array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload)); |
|
|
0 ignored issues
–
show
The property
frame_queue cannot be accessed from this context as it is declared protected in class PhpAmqpLib\Channel\AbstractChannel.
This check looks for access to properties that are not accessible from the current context. If you need to make a property accessible to another context you can either raise its visibility level or provide an accessible getter in the defining class. Loading history...
|
|||
| 574 | 4 | } |
|
| 575 | |||
| 576 | // If we just queued up a method for channel 0 (the Connection |
||
| 577 | // itself) it's probably a close method in reaction to some |
||
| 578 | // error, so deal with it right away. |
||
| 579 | 5 | if (($frame_type == 1) && ($frame_channel == 0)) { |
|
| 580 | $this->wait(); |
||
| 581 | } |
||
| 582 | } |
||
| 583 | 4 | } |
|
| 584 | } |
||
| 585 | |||
| 586 | /** |
||
| 587 | * Fetches a channel object identified by the numeric channel_id, or |
||
| 588 | * create that object if it doesn't already exist. |
||
| 589 | * |
||
| 590 | * @param string $channel_id |
||
| 591 | * @return AMQPChannel |
||
| 592 | */ |
||
| 593 | 60 | public function channel($channel_id = null) |
|
| 594 | { |
||
| 595 | 60 | if (isset($this->channels[$channel_id])) { |
|
| 596 | |||
| 597 | return $this->channels[$channel_id]; |
||
| 598 | } |
||
| 599 | |||
| 600 | 60 | $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id(); |
|
| 601 | 60 | $ch = new AMQPChannel($this->connection, $channel_id); |
|
| 602 | 60 | $this->channels[$channel_id] = $ch; |
|
| 603 | |||
| 604 | 60 | return $ch; |
|
| 605 | } |
||
| 606 | |||
| 607 | /** |
||
| 608 | * Requests a connection close |
||
| 609 | * |
||
| 610 | * @param int $reply_code |
||
| 611 | * @param string $reply_text |
||
| 612 | * @param array $method_sig |
||
| 613 | * @return mixed|null |
||
| 614 | */ |
||
| 615 | 60 | public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0)) |
|
| 616 | { |
||
| 617 | 60 | if (!$this->protocolWriter || !$this->isConnected()) { |
|
| 618 | return null; |
||
| 619 | } |
||
| 620 | |||
| 621 | 60 | $this->closeChannels(); |
|
| 622 | |||
| 623 | 60 | list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose( |
|
| 624 | 48 | $reply_code, |
|
| 625 | 48 | $reply_text, |
|
| 626 | 60 | $method_sig[0], |
|
| 627 | 60 | $method_sig[1] |
|
| 628 | 48 | ); |
|
| 629 | 60 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
| 630 | |||
| 631 | 60 | $this->setIsConnected(false); |
|
| 632 | |||
| 633 | 60 | return $this->wait(array( |
|
| 634 | 60 | $this->waitHelper->get_wait('connection.close_ok') |
|
| 635 | 48 | )); |
|
| 636 | } |
||
| 637 | |||
| 638 | /** |
||
| 639 | * @param AMQPReader $args |
||
| 640 | * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException |
||
| 641 | */ |
||
| 642 | protected function connection_close($args) |
||
| 643 | { |
||
| 644 | $reply_code = $args->read_short(); |
||
| 645 | $reply_text = $args->read_shortstr(); |
||
| 646 | $class_id = $args->read_short(); |
||
| 647 | $method_id = $args->read_short(); |
||
| 648 | |||
| 649 | $this->x_close_ok(); |
||
| 650 | |||
| 651 | throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id)); |
||
|
0 ignored issues
–
show
array($class_id, $method_id) is of type array<integer,*,{"0":"*","1":"*"}>, but the function expects a object<Exception>.
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
Loading history...
|
|||
| 652 | } |
||
| 653 | |||
| 654 | /** |
||
| 655 | * Confirms a connection close |
||
| 656 | */ |
||
| 657 | protected function x_close_ok() |
||
| 658 | { |
||
| 659 | $this->send_method_frame( |
||
| 660 | explode(',', $this->waitHelper->get_wait('connection.close_ok')) |
||
| 661 | ); |
||
| 662 | $this->do_close(); |
||
| 663 | } |
||
| 664 | |||
| 665 | /** |
||
| 666 | * Confirm a connection close |
||
| 667 | */ |
||
| 668 | 60 | protected function connection_close_ok($args) |
|
|
0 ignored issues
–
show
|
|||
| 669 | { |
||
| 670 | 60 | $this->do_close(); |
|
| 671 | 60 | } |
|
| 672 | |||
| 673 | /** |
||
| 674 | * @param string $virtual_host |
||
| 675 | * @param string $capabilities |
||
| 676 | * @param bool $insist |
||
| 677 | * @return mixed |
||
| 678 | */ |
||
| 679 | 60 | protected function x_open($virtual_host, $capabilities = '', $insist = false) |
|
| 680 | { |
||
| 681 | 60 | $args = new AMQPWriter(); |
|
| 682 | 60 | $args->write_shortstr($virtual_host); |
|
| 683 | 60 | $args->write_shortstr($capabilities); |
|
| 684 | 60 | $args->write_bits(array($insist)); |
|
| 685 | 60 | $this->send_method_frame(array(10, 40), $args); |
|
|
0 ignored issues
–
show
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
Loading history...
|
|||
| 686 | |||
| 687 | $wait = array( |
||
| 688 | 60 | $this->waitHelper->get_wait('connection.open_ok') |
|
| 689 | 48 | ); |
|
| 690 | |||
| 691 | 60 | if ($this->protocolVersion == '0.8') { |
|
| 692 | $wait[] = $this->waitHelper->get_wait('connection.redirect'); |
||
| 693 | } |
||
| 694 | |||
| 695 | 60 | return $this->wait($wait); |
|
| 696 | } |
||
| 697 | |||
| 698 | /** |
||
| 699 | * Signals that the connection is ready |
||
| 700 | * |
||
| 701 | * @param AMQPReader $args |
||
| 702 | */ |
||
| 703 | 60 | protected function connection_open_ok($args) |
|
| 704 | { |
||
| 705 | 60 | $this->known_hosts = $args->read_shortstr(); |
|
| 706 | 60 | $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts); |
|
| 707 | 60 | } |
|
| 708 | |||
| 709 | /** |
||
| 710 | * Asks the client to use a different server |
||
| 711 | * |
||
| 712 | * @param AMQPReader $args |
||
| 713 | * @return string |
||
| 714 | */ |
||
| 715 | protected function connection_redirect($args) |
||
| 716 | { |
||
| 717 | $host = $args->read_shortstr(); |
||
| 718 | $this->known_hosts = $args->read_shortstr(); |
||
| 719 | $this->debug->debug_msg(sprintf( |
||
| 720 | 'Redirected to [%s], known_hosts [%s]', |
||
| 721 | $host, |
||
| 722 | $this->known_hosts |
||
| 723 | )); |
||
| 724 | |||
| 725 | return $host; |
||
| 726 | } |
||
| 727 | |||
| 728 | /** |
||
| 729 | * Security mechanism challenge |
||
| 730 | * |
||
| 731 | * @param AMQPReader $args |
||
| 732 | */ |
||
| 733 | protected function connection_secure($args) |
||
| 734 | { |
||
| 735 | $challenge = $args->read_longstr(); |
||
|
0 ignored issues
–
show
$challenge is not used, you could remove the assignment.
This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently. $myVar = 'Value';
$higher = false;
if (rand(1, 6) > 3) {
$higher = true;
} else {
$higher = false;
}
Both the Loading history...
|
|||
| 736 | } |
||
| 737 | |||
| 738 | /** |
||
| 739 | * Security mechanism response |
||
| 740 | */ |
||
| 741 | protected function x_secure_ok($response) |
||
| 742 | { |
||
| 743 | $args = new AMQPWriter(); |
||
| 744 | $args->write_longstr($response); |
||
| 745 | $this->send_method_frame(array(10, 21), $args); |
||
|
0 ignored issues
–
show
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
Loading history...
|
|||
| 746 | } |
||
| 747 | |||
| 748 | /** |
||
| 749 | * Starts connection negotiation |
||
| 750 | * |
||
| 751 | * @param AMQPReader $args |
||
| 752 | */ |
||
| 753 | 60 | protected function connection_start($args) |
|
| 754 | { |
||
| 755 | 60 | $this->version_major = $args->read_octet(); |
|
| 756 | 60 | $this->version_minor = $args->read_octet(); |
|
| 757 | 60 | $this->server_properties = $args->read_table(); |
|
|
0 ignored issues
–
show
It seems like
$args->read_table() can also be of type object<PhpAmqpLib\Wire\AMQPTable>. However, the property $server_properties is declared as type array. Maybe add an additional type check?
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly. For example, imagine you have a variable Either this assignment is in error or a type check should be added for that assignment. class Id
{
public $id;
public function __construct($id)
{
$this->id = $id;
}
}
class Account
{
/** @var Id $id */
public $id;
}
$account_id = false;
if (starsAreRight()) {
$account_id = new Id(42);
}
$account = new Account();
if ($account instanceof Id)
{
$account->id = $account_id;
}
Loading history...
|
|||
| 758 | 60 | $this->mechanisms = explode(' ', $args->read_longstr()); |
|
| 759 | 60 | $this->locales = explode(' ', $args->read_longstr()); |
|
| 760 | |||
| 761 | 60 | $this->debug->debug_connection_start( |
|
| 762 | 60 | $this->version_major, |
|
| 763 | 60 | $this->version_minor, |
|
| 764 | 60 | $this->server_properties, |
|
| 765 | 60 | $this->mechanisms, |
|
| 766 | 60 | $this->locales |
|
| 767 | 48 | ); |
|
| 768 | 60 | } |
|
| 769 | |||
| 770 | /** |
||
| 771 | * @param $client_properties |
||
| 772 | * @param $mechanism |
||
| 773 | * @param $response |
||
| 774 | * @param $locale |
||
| 775 | */ |
||
| 776 | 60 | protected function x_start_ok($client_properties, $mechanism, $response, $locale) |
|
| 777 | { |
||
| 778 | 60 | $args = new AMQPWriter(); |
|
| 779 | 60 | $args->write_table($client_properties); |
|
| 780 | 60 | $args->write_shortstr($mechanism); |
|
| 781 | 60 | $args->write_longstr($response); |
|
| 782 | 60 | $args->write_shortstr($locale); |
|
| 783 | 60 | $this->send_method_frame(array(10, 11), $args); |
|
|
0 ignored issues
–
show
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
Loading history...
|
|||
| 784 | 60 | } |
|
| 785 | |||
| 786 | /** |
||
| 787 | * Proposes connection tuning parameters |
||
| 788 | * |
||
| 789 | * @param AMQPReader $args |
||
| 790 | */ |
||
| 791 | 60 | protected function connection_tune($args) |
|
| 792 | { |
||
| 793 | 60 | $v = $args->read_short(); |
|
| 794 | 60 | if ($v) { |
|
| 795 | $this->channel_max = $v; |
||
| 796 | } |
||
| 797 | |||
| 798 | 60 | $v = $args->read_long(); |
|
| 799 | 60 | if ($v) { |
|
| 800 | 60 | $this->frame_max = $v; |
|
|
0 ignored issues
–
show
The property
$frame_max was declared of type integer, but $v is of type string. Maybe add a type cast?
This check looks for assignments to scalar types that may be of the wrong type. To ensure the code behaves as expected, it may be a good idea to add an explicit type cast. $answer = 42;
$correct = false;
$correct = (bool) $answer;
Loading history...
|
|||
| 801 | 48 | } |
|
| 802 | |||
| 803 | // use server proposed value if not set |
||
| 804 | 60 | if ($this->heartbeat === null) { |
|
| 805 | $this->heartbeat = $args->read_short(); |
||
| 806 | } |
||
| 807 | |||
| 808 | 60 | $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat); |
|
| 809 | 60 | } |
|
| 810 | |||
| 811 | /** |
||
| 812 | * Negotiates connection tuning parameters |
||
| 813 | * |
||
| 814 | * @param $channel_max |
||
| 815 | * @param $frame_max |
||
| 816 | * @param $heartbeat |
||
| 817 | */ |
||
| 818 | 60 | protected function x_tune_ok($channel_max, $frame_max, $heartbeat) |
|
| 819 | { |
||
| 820 | 60 | $args = new AMQPWriter(); |
|
| 821 | 60 | $args->write_short($channel_max); |
|
| 822 | 60 | $args->write_long($frame_max); |
|
| 823 | 60 | $args->write_short($heartbeat); |
|
| 824 | 60 | $this->send_method_frame(array(10, 31), $args); |
|
|
0 ignored issues
–
show
$args is of type object<PhpAmqpLib\Wire\AMQPWriter>, but the function expects a string.
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
Loading history...
|
|||
| 825 | 60 | $this->wait_tune_ok = false; |
|
| 826 | 60 | } |
|
| 827 | |||
| 828 | /** |
||
| 829 | * @return SocketIO |
||
| 830 | */ |
||
| 831 | public function getSocket() |
||
| 832 | { |
||
| 833 | return $this->io->getSocket(); |
||
| 834 | } |
||
| 835 | |||
| 836 | /** |
||
| 837 | * @return \PhpAmqpLib\Wire\IO\AbstractIO |
||
| 838 | */ |
||
| 839 | 50 | protected function getIO() |
|
| 840 | { |
||
| 841 | 50 | return $this->io; |
|
| 842 | } |
||
| 843 | |||
| 844 | /** |
||
| 845 | * Handles connection blocked notifications |
||
| 846 | * |
||
| 847 | * @param AMQPReader $args |
||
| 848 | */ |
||
| 849 | protected function connection_blocked(AMQPReader $args) |
||
| 850 | { |
||
| 851 | // Call the block handler and pass in the reason |
||
| 852 | $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr())); |
||
| 853 | } |
||
| 854 | |||
| 855 | /** |
||
| 856 | * Handles connection unblocked notifications |
||
| 857 | */ |
||
| 858 | protected function connection_unblocked(AMQPReader $args) |
||
|
0 ignored issues
–
show
|
|||
| 859 | { |
||
| 860 | // No args to an unblock event |
||
| 861 | $this->dispatch_to_handler($this->connection_unblock_handler, array()); |
||
| 862 | } |
||
| 863 | |||
| 864 | /** |
||
| 865 | * Sets a handler which is called whenever a connection.block is sent from the server |
||
| 866 | * |
||
| 867 | * @param callable $callback |
||
| 868 | */ |
||
| 869 | public function set_connection_block_handler($callback) |
||
| 870 | { |
||
| 871 | $this->connection_block_handler = $callback; |
||
| 872 | } |
||
| 873 | |||
| 874 | /** |
||
| 875 | * Sets a handler which is called whenever a connection.block is sent from the server |
||
| 876 | * |
||
| 877 | * @param callable $callback |
||
| 878 | */ |
||
| 879 | public function set_connection_unblock_handler($callback) |
||
| 880 | { |
||
| 881 | $this->connection_unblock_handler = $callback; |
||
| 882 | } |
||
| 883 | |||
| 884 | /** |
||
| 885 | * Gets the connection status |
||
| 886 | * |
||
| 887 | * @return bool |
||
| 888 | */ |
||
| 889 | 60 | public function isConnected() |
|
| 890 | { |
||
| 891 | 60 | return $this->is_connected; |
|
| 892 | } |
||
| 893 | |||
| 894 | /** |
||
| 895 | * Set the connection status |
||
| 896 | * |
||
| 897 | * @param bool $is_connected |
||
| 898 | */ |
||
| 899 | 60 | protected function setIsConnected($is_connected) |
|
| 900 | { |
||
| 901 | 60 | $this->is_connected = (bool) $is_connected; |
|
| 902 | 60 | } |
|
| 903 | |||
| 904 | /** |
||
| 905 | * Closes all available channels |
||
| 906 | */ |
||
| 907 | 60 | protected function closeChannels() |
|
| 908 | { |
||
| 909 | 60 | foreach ($this->channels as $key => $channel) { |
|
| 910 | // channels[0] is this connection object, so don't close it yet |
||
| 911 | 60 | if ($key === 0) { |
|
| 912 | 60 | continue; |
|
| 913 | } |
||
| 914 | try { |
||
| 915 | 25 | $channel->close(); |
|
| 916 | 25 | } catch (\Exception $e) { |
|
| 917 | /* Ignore closing errors */ |
||
| 918 | } |
||
| 919 | 48 | } |
|
| 920 | 60 | } |
|
| 921 | |||
| 922 | /** |
||
| 923 | * Should the connection be attempted during construction? |
||
| 924 | * |
||
| 925 | * @return bool |
||
| 926 | */ |
||
| 927 | 50 | public function connectOnConstruct() |
|
| 928 | { |
||
| 929 | 50 | return true; |
|
| 930 | } |
||
| 931 | } |
||
| 932 |
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.
Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..