videlalvaro /
php-amqplib
This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php |
||
| 2 | namespace PhpAmqpLib\Wire\IO; |
||
| 3 | |||
| 4 | use PhpAmqpLib\Exception\AMQPIOException; |
||
| 5 | use PhpAmqpLib\Exception\AMQPRuntimeException; |
||
| 6 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||
| 7 | use PhpAmqpLib\Helper\MiscHelper; |
||
| 8 | use PhpAmqpLib\Wire\AMQPWriter; |
||
| 9 | |||
| 10 | class StreamIO extends AbstractIO |
||
| 11 | { |
||
| 12 | /** @var string */ |
||
| 13 | protected $protocol; |
||
| 14 | |||
| 15 | /** @var string */ |
||
| 16 | protected $host; |
||
| 17 | |||
| 18 | /** @var int */ |
||
| 19 | protected $port; |
||
| 20 | |||
| 21 | /** @var int */ |
||
| 22 | protected $connection_timeout; |
||
| 23 | |||
| 24 | /** @var int */ |
||
| 25 | protected $read_write_timeout; |
||
| 26 | |||
| 27 | /** @var resource */ |
||
| 28 | protected $context; |
||
| 29 | |||
| 30 | /** @var bool */ |
||
| 31 | protected $keepalive; |
||
| 32 | |||
| 33 | /** @var int */ |
||
| 34 | protected $heartbeat; |
||
| 35 | |||
| 36 | /** @var float */ |
||
| 37 | protected $last_read; |
||
| 38 | |||
| 39 | /** @var float */ |
||
| 40 | protected $last_write; |
||
| 41 | |||
| 42 | /** @var array */ |
||
| 43 | protected $last_error; |
||
| 44 | |||
| 45 | /** @var resource */ |
||
| 46 | private $sock; |
||
| 47 | |||
| 48 | /** @var bool */ |
||
| 49 | private $canSelectNull; |
||
| 50 | |||
| 51 | /** @var bool */ |
||
| 52 | private $canDispatchPcntlSignal; |
||
| 53 | |||
| 54 | /** |
||
| 55 | * @param string $host |
||
| 56 | * @param int $port |
||
| 57 | * @param int $connection_timeout |
||
| 58 | * @param int $read_write_timeout |
||
| 59 | * @param null $context |
||
| 60 | * @param bool $keepalive |
||
| 61 | * @param int $heartbeat |
||
| 62 | */ |
||
| 63 | 40 | public function __construct( |
|
| 64 | $host, |
||
| 65 | $port, |
||
| 66 | $connection_timeout, |
||
| 67 | $read_write_timeout, |
||
| 68 | $context = null, |
||
| 69 | $keepalive = false, |
||
| 70 | $heartbeat = 0 |
||
| 71 | ) { |
||
| 72 | 40 | $this->protocol = 'tcp'; |
|
| 73 | 40 | $this->host = $host; |
|
| 74 | 40 | $this->port = $port; |
|
| 75 | 40 | $this->connection_timeout = $connection_timeout; |
|
| 76 | 40 | $this->read_write_timeout = $read_write_timeout; |
|
| 77 | 40 | $this->context = $context; |
|
| 78 | 40 | $this->keepalive = $keepalive; |
|
| 79 | 40 | $this->heartbeat = $heartbeat; |
|
| 80 | 40 | $this->canSelectNull = true; |
|
| 81 | 40 | $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled(); |
|
| 82 | |||
| 83 | 40 | if (is_null($this->context)) { |
|
| 84 | 40 | $this->context = stream_context_create(); |
|
| 85 | 32 | } else { |
|
| 86 | $this->protocol = 'ssl'; |
||
| 87 | // php bugs 41631 & 65137 prevent select null from working on ssl streams |
||
| 88 | if (PHP_VERSION_ID < 50436) { |
||
| 89 | $this->canSelectNull = false; |
||
| 90 | } |
||
| 91 | } |
||
| 92 | 40 | } |
|
| 93 | |||
| 94 | /** |
||
| 95 | * @return bool |
||
| 96 | */ |
||
| 97 | 40 | private function isPcntlSignalEnabled() |
|
| 98 | { |
||
| 99 | 40 | return extension_loaded('pcntl') |
|
| 100 | 40 | && function_exists('pcntl_signal_dispatch') |
|
| 101 | 40 | && (defined('AMQP_WITHOUT_SIGNALS') && !AMQP_WITHOUT_SIGNALS); |
|
| 102 | } |
||
| 103 | |||
| 104 | /** |
||
| 105 | * Sets up the stream connection |
||
| 106 | * |
||
| 107 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
| 108 | * @throws \Exception |
||
| 109 | */ |
||
| 110 | 40 | public function connect() |
|
| 111 | { |
||
| 112 | 40 | $errstr = $errno = null; |
|
| 113 | |||
| 114 | 40 | $remote = sprintf( |
|
| 115 | 40 | '%s://%s:%s', |
|
| 116 | 40 | $this->protocol, |
|
| 117 | 40 | $this->host, |
|
| 118 | 40 | $this->port |
|
| 119 | 32 | ); |
|
| 120 | |||
| 121 | 40 | set_error_handler(array($this, 'error_handler')); |
|
| 122 | |||
| 123 | 40 | $this->sock = stream_socket_client( |
|
| 124 | 32 | $remote, |
|
| 125 | 32 | $errno, |
|
| 126 | 32 | $errstr, |
|
| 127 | 40 | $this->connection_timeout, |
|
| 128 | 40 | STREAM_CLIENT_CONNECT, |
|
| 129 | 40 | $this->context |
|
| 130 | 32 | ); |
|
| 131 | |||
| 132 | 40 | restore_error_handler(); |
|
| 133 | |||
| 134 | 40 | if (false === $this->sock) { |
|
| 135 | throw new AMQPRuntimeException( |
||
| 136 | sprintf( |
||
| 137 | 'Error Connecting to server(%s): %s ', |
||
| 138 | $errno, |
||
| 139 | $errstr |
||
| 140 | ), |
||
| 141 | $errno |
||
| 142 | ); |
||
| 143 | } |
||
| 144 | |||
| 145 | 40 | if (false === stream_socket_get_name($this->sock, true)) { |
|
| 146 | throw new AMQPRuntimeException( |
||
| 147 | sprintf( |
||
| 148 | 'Connection refused: %s ', |
||
| 149 | $remote |
||
| 150 | ) |
||
| 151 | ); |
||
| 152 | } |
||
| 153 | |||
| 154 | 40 | list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_write_timeout); |
|
| 155 | 40 | if (!stream_set_timeout($this->sock, $sec, $uSec)) { |
|
| 156 | throw new AMQPIOException('Timeout could not be set'); |
||
| 157 | } |
||
| 158 | |||
| 159 | // php cannot capture signals while streams are blocking |
||
| 160 | 40 | if ($this->canDispatchPcntlSignal) { |
|
| 161 | stream_set_blocking($this->sock, 0); |
||
| 162 | stream_set_write_buffer($this->sock, 0); |
||
| 163 | if (function_exists('stream_set_read_buffer')) { |
||
| 164 | stream_set_read_buffer($this->sock, 0); |
||
| 165 | } |
||
| 166 | } else { |
||
| 167 | 40 | stream_set_blocking($this->sock, 1); |
|
| 168 | } |
||
| 169 | |||
| 170 | 40 | if ($this->keepalive) { |
|
| 171 | $this->enable_keepalive(); |
||
| 172 | } |
||
| 173 | 40 | } |
|
| 174 | |||
| 175 | /** |
||
| 176 | * Reconnects the socket |
||
| 177 | */ |
||
| 178 | 10 | public function reconnect() |
|
| 179 | { |
||
| 180 | 10 | $this->close(); |
|
| 181 | 10 | $this->connect(); |
|
| 182 | 10 | } |
|
| 183 | |||
| 184 | /** |
||
| 185 | * @param $len |
||
| 186 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
| 187 | * @return mixed|string |
||
| 188 | */ |
||
| 189 | 40 | public function read($len) |
|
| 190 | { |
||
| 191 | 40 | $read = 0; |
|
| 192 | 40 | $data = ''; |
|
| 193 | |||
| 194 | 40 | while ($read < $len) { |
|
| 195 | 40 | $this->check_heartbeat(); |
|
| 196 | |||
| 197 | 40 | if (!is_resource($this->sock) || feof($this->sock)) { |
|
| 198 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
| 199 | } |
||
| 200 | |||
| 201 | 40 | set_error_handler(array($this, 'error_handler')); |
|
| 202 | 40 | $buffer = fread($this->sock, ($len - $read)); |
|
| 203 | 40 | restore_error_handler(); |
|
| 204 | |||
| 205 | 40 | if ($buffer === false) { |
|
| 206 | throw new AMQPRuntimeException('Error receiving data'); |
||
| 207 | } |
||
| 208 | |||
| 209 | 40 | if ($buffer === '') { |
|
| 210 | if ($this->canDispatchPcntlSignal) { |
||
| 211 | // prevent cpu from being consumed while waiting |
||
| 212 | if ($this->canSelectNull) { |
||
| 213 | $this->select(null, null); |
||
| 214 | pcntl_signal_dispatch(); |
||
| 215 | } else { |
||
| 216 | usleep(100000); |
||
| 217 | pcntl_signal_dispatch(); |
||
| 218 | } |
||
| 219 | } |
||
| 220 | continue; |
||
| 221 | } |
||
| 222 | |||
| 223 | 40 | $read += mb_strlen($buffer, 'ASCII'); |
|
| 224 | 40 | $data .= $buffer; |
|
| 225 | 32 | } |
|
| 226 | |||
| 227 | 40 | if (mb_strlen($data, 'ASCII') !== $len) { |
|
| 228 | throw new AMQPRuntimeException( |
||
| 229 | sprintf( |
||
| 230 | 'Error reading data. Received %s instead of expected %s bytes', |
||
| 231 | mb_strlen($data, 'ASCII'), |
||
| 232 | $len |
||
| 233 | ) |
||
| 234 | ); |
||
| 235 | } |
||
| 236 | |||
| 237 | 40 | $this->last_read = microtime(true); |
|
| 238 | 40 | return $data; |
|
| 239 | } |
||
| 240 | |||
| 241 | /** |
||
| 242 | * @param $data |
||
| 243 | * @return mixed|void |
||
| 244 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
| 245 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
| 246 | */ |
||
| 247 | 40 | public function write($data) |
|
| 248 | { |
||
| 249 | 40 | $written = 0; |
|
| 250 | 40 | $len = mb_strlen($data, 'ASCII'); |
|
| 251 | |||
| 252 | 40 | while ($written < $len) { |
|
| 253 | |||
| 254 | 40 | if (!is_resource($this->sock)) { |
|
| 255 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
| 256 | } |
||
| 257 | |||
| 258 | 40 | set_error_handler(array($this, 'error_handler')); |
|
| 259 | // OpenSSL's C library function SSL_write() can balk on buffers > 8192 |
||
| 260 | // bytes in length, so we're limiting the write size here. On both TLS |
||
| 261 | // and plaintext connections, the write loop will continue until the |
||
| 262 | // buffer has been fully written. |
||
| 263 | // This behavior has been observed in OpenSSL dating back to at least |
||
| 264 | // September 2002: |
||
| 265 | // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361 |
||
| 266 | 40 | $buffer = fwrite($this->sock, $data, 8192); |
|
| 267 | 40 | restore_error_handler(); |
|
| 268 | |||
| 269 | 40 | if ($buffer === false) { |
|
| 270 | throw new AMQPRuntimeException('Error sending data'); |
||
| 271 | } |
||
| 272 | |||
| 273 | 40 | if ($buffer === 0 && feof($this->sock)) { |
|
| 274 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
| 275 | } |
||
| 276 | |||
| 277 | 40 | if ($this->timed_out()) { |
|
| 278 | throw new AMQPTimeoutException('Error sending data. Socket connection timed out'); |
||
| 279 | } |
||
| 280 | |||
| 281 | 40 | $written += $buffer; |
|
| 282 | |||
| 283 | 40 | if ($buffer > 0) { |
|
| 284 | 40 | $data = mb_substr($data, $buffer, mb_strlen($data, 'ASCII') - $buffer, 'ASCII'); |
|
| 285 | 32 | } |
|
| 286 | 32 | } |
|
| 287 | |||
| 288 | 40 | $this->last_write = microtime(true); |
|
| 289 | 40 | } |
|
| 290 | |||
| 291 | /** |
||
| 292 | * Internal error handler to deal with stream and socket errors that need to be ignored |
||
| 293 | * |
||
| 294 | * @param int $errno |
||
| 295 | * @param string $errstr |
||
| 296 | * @param string $errfile |
||
| 297 | * @param int $errline |
||
| 298 | * @param array $errcontext |
||
| 299 | * @return null |
||
| 300 | * @throws \ErrorException |
||
| 301 | */ |
||
| 302 | public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null) |
||
| 303 | { |
||
| 304 | $this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext'); |
||
| 305 | |||
| 306 | // fwrite notice that the stream isn't ready |
||
| 307 | if (strstr($errstr, 'Resource temporarily unavailable')) { |
||
| 308 | // it's allowed to retry |
||
| 309 | return null; |
||
| 310 | } |
||
| 311 | |||
| 312 | // stream_select warning that it has been interrupted by a signal |
||
| 313 | if (strstr($errstr, 'Interrupted system call')) { |
||
| 314 | // it's allowed while processing signals |
||
| 315 | return null; |
||
| 316 | } |
||
| 317 | |||
| 318 | restore_error_handler(); |
||
| 319 | |||
| 320 | // raise all other issues to exceptions |
||
| 321 | throw new \ErrorException($errstr, 0, $errno, $errfile, $errline); |
||
| 322 | } |
||
| 323 | |||
| 324 | /** |
||
| 325 | * Heartbeat logic: check connection health here |
||
| 326 | */ |
||
| 327 | 40 | protected function check_heartbeat() |
|
| 328 | { |
||
| 329 | // ignore unless heartbeat interval is set |
||
| 330 | 40 | if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) { |
|
| 331 | $t = microtime(true); |
||
| 332 | $t_read = round($t - $this->last_read); |
||
| 333 | $t_write = round($t - $this->last_write); |
||
| 334 | |||
| 335 | // server has gone away |
||
| 336 | if (($this->heartbeat * 2) < $t_read) { |
||
| 337 | $this->reconnect(); |
||
| 338 | } |
||
| 339 | |||
| 340 | // time for client to send a heartbeat |
||
| 341 | if (($this->heartbeat / 2) < $t_write) { |
||
| 342 | $this->write_heartbeat(); |
||
| 343 | } |
||
| 344 | } |
||
| 345 | 40 | } |
|
| 346 | |||
| 347 | /** |
||
| 348 | * Sends a heartbeat message |
||
| 349 | */ |
||
| 350 | protected function write_heartbeat() |
||
| 351 | { |
||
| 352 | $pkt = new AMQPWriter(); |
||
| 353 | $pkt->write_octet(8); |
||
| 354 | $pkt->write_short(0); |
||
| 355 | $pkt->write_long(0); |
||
| 356 | $pkt->write_octet(0xCE); |
||
| 357 | $this->write($pkt->getvalue()); |
||
| 358 | } |
||
| 359 | |||
| 360 | 40 | public function close() |
|
| 361 | { |
||
| 362 | 40 | if (is_resource($this->sock)) { |
|
| 363 | 40 | fclose($this->sock); |
|
| 364 | 32 | } |
|
| 365 | 40 | $this->sock = null; |
|
| 366 | 40 | } |
|
| 367 | |||
| 368 | /** |
||
| 369 | * @return resource |
||
| 370 | */ |
||
| 371 | public function get_socket() |
||
| 372 | { |
||
| 373 | return $this->sock; |
||
| 374 | } |
||
| 375 | |||
| 376 | /** |
||
| 377 | * @return resource |
||
| 378 | */ |
||
| 379 | public function getSocket() |
||
| 380 | { |
||
| 381 | return $this->get_socket(); |
||
| 382 | } |
||
| 383 | |||
| 384 | /** |
||
| 385 | * @param $sec |
||
| 386 | * @param $usec |
||
| 387 | * @return int|mixed |
||
| 388 | */ |
||
| 389 | public function select($sec, $usec) |
||
| 390 | { |
||
| 391 | $read = array($this->sock); |
||
| 392 | $write = null; |
||
| 393 | $except = null; |
||
| 394 | $result = false; |
||
|
0 ignored issues
–
show
|
|||
| 395 | |||
| 396 | set_error_handler(array($this, 'error_handler')); |
||
| 397 | $result = stream_select($read, $write, $except, $sec, $usec); |
||
| 398 | restore_error_handler(); |
||
| 399 | |||
| 400 | return $result; |
||
| 401 | } |
||
| 402 | |||
| 403 | /** |
||
| 404 | * @return mixed |
||
| 405 | */ |
||
| 406 | 40 | protected function timed_out() |
|
| 407 | { |
||
| 408 | // get status of socket to determine whether or not it has timed out |
||
| 409 | 40 | $info = stream_get_meta_data($this->sock); |
|
| 410 | |||
| 411 | 40 | return $info['timed_out']; |
|
| 412 | } |
||
| 413 | |||
| 414 | /** |
||
| 415 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
| 416 | */ |
||
| 417 | protected function enable_keepalive() |
||
| 418 | { |
||
| 419 | if (!function_exists('socket_import_stream')) { |
||
| 420 | throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist'); |
||
| 421 | } |
||
| 422 | |||
| 423 | if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
||
| 424 | throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
||
| 425 | } |
||
| 426 | |||
| 427 | $socket = socket_import_stream($this->sock); |
||
| 428 | socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); |
||
| 429 | } |
||
| 430 | } |
||
| 431 |
This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.
Both the
$myVarassignment in line 1 and the$higherassignment in line 2 are dead. The first because$myVaris never used and the second because$higheris always overwritten for every possible time line.