| Conditions | 33 |
| Paths | 1 |
| Total Lines | 182 |
| Code Lines | 114 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | <?php |
||
| 177 | protected function onRead() |
||
| 178 | { |
||
| 179 | if ($this->getInputLength() <= 0) { |
||
| 180 | return; |
||
| 181 | } |
||
| 182 | // set busy for connection |
||
| 183 | $this->busy = true; |
||
| 184 | |||
| 185 | /** |
||
| 186 | * 1) read Frame::HEADER_SIZE |
||
| 187 | * 2) get payload length from header |
||
| 188 | * 3) concatenate header and payload + 1 byte (for Constants::FRAME_END) into buffer |
||
| 189 | * 4) parse buffer |
||
| 190 | * 5) do stuff... |
||
| 191 | * 6) if bev contains more data go to 1 |
||
| 192 | */ |
||
| 193 | frame: |
||
| 194 | $header = $this->readExact(Frame::HEADER_SIZE); |
||
| 195 | if ($header === false) { |
||
| 196 | return; |
||
| 197 | } |
||
| 198 | $framePayloadSize = Binary::b2i(substr($header, Frame::HEADER_TYPE_SIZE + Frame::HEADER_CHANNEL_SIZE, Frame::HEADER_PAYLOAD_LENGTH_SIZE)); |
||
|
|
|||
| 199 | |||
| 200 | $payload = $this->readExact($framePayloadSize + 1); |
||
| 201 | if ($payload === false) { |
||
| 202 | return; |
||
| 203 | } |
||
| 204 | $buffer = $header . $payload; |
||
| 205 | |||
| 206 | $frame = $this->parser->feed($buffer); |
||
| 207 | if ($frame === null) { |
||
| 208 | return; |
||
| 209 | } |
||
| 210 | if ($this->debug) { |
||
| 211 | $this->log(sprintf('[AMQP] %s packet received', get_class($frame))); |
||
| 212 | } |
||
| 213 | |||
| 214 | if (!$this->isHandshaked) { |
||
| 215 | switch (true) { |
||
| 216 | case $frame instanceof ConnectionStartFrame: |
||
| 217 | if ($frame->versionMajor !== 0 || $frame->versionMinor !== 9) { |
||
| 218 | throw AMQPConnectionException::handshakeFailed( |
||
| 219 | $this->connectionOptions, |
||
| 220 | sprintf( |
||
| 221 | 'the broker reported an unexpected AMQP version (v%d.%d)', |
||
| 222 | $frame->versionMajor, |
||
| 223 | $frame->versionMinor |
||
| 224 | ) |
||
| 225 | ); |
||
| 226 | } |
||
| 227 | if (!preg_match('/\bAMQPLAIN\b/', $frame->mechanisms)) { |
||
| 228 | throw AMQPConnectionException::handshakeFailed( |
||
| 229 | $this->connectionOptions, |
||
| 230 | 'the AMQPLAIN authentication mechanism is not supported' |
||
| 231 | ); |
||
| 232 | } |
||
| 233 | |||
| 234 | $this->features = new Features(); |
||
| 235 | $properties = $frame->serverProperties; |
||
| 236 | if (isset($properties['product']) && 'RabbitMQ' === $properties['product']) { |
||
| 237 | $this->features->qosSizeLimits = false; |
||
| 238 | } |
||
| 239 | if (array_key_exists('capabilities', $properties)) { |
||
| 240 | if (array_key_exists('per_consumer_qos', $properties['capabilities'])) { |
||
| 241 | $this->features->perConsumerQos = (bool)$properties['capabilities']['per_consumer_qos']; |
||
| 242 | } |
||
| 243 | if (array_key_exists('exchange_exchange_bindings', $properties['capabilities'])) { |
||
| 244 | $this->features->exchangeToExchangeBindings = (bool)$properties['capabilities']['exchange_exchange_bindings']; |
||
| 245 | } |
||
| 246 | } |
||
| 247 | |||
| 248 | // Serialize credentials in "AMQPLAIN" format, which is essentially an |
||
| 249 | // AMQP table without the 4-byte size header ... |
||
| 250 | $user = $this->connectionOptions->getUsername(); |
||
| 251 | $pass = $this->connectionOptions->getPassword(); |
||
| 252 | |||
| 253 | $credentials = "\x05LOGINS" . pack('N', strlen($user)) . $user |
||
| 254 | . "\x08PASSWORDS" . pack('N', strlen($pass)) . $pass; |
||
| 255 | |||
| 256 | $this->command(ConnectionStartOkFrame::create( |
||
| 257 | [ |
||
| 258 | 'product' => $this->connectionOptions->getProductName(), |
||
| 259 | 'version' => $this->connectionOptions->getProductVersion(), |
||
| 260 | 'platform' => PackageInfo::AMQP_PLATFORM, |
||
| 261 | 'copyright' => PackageInfo::AMQP_COPYRIGHT, |
||
| 262 | 'information' => PackageInfo::AMQP_INFORMATION, |
||
| 263 | |||
| 264 | ], |
||
| 265 | 'AMQPLAIN', |
||
| 266 | $credentials |
||
| 267 | )); |
||
| 268 | break; |
||
| 269 | case $frame instanceof ConnectionTuneFrame: |
||
| 270 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
| 271 | if ($frame->channelMax === self::UNLIMITED_CHANNELS) { |
||
| 272 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
| 273 | } elseif ($frame->channelMax < self::MAXIMUM_CHANNELS) { |
||
| 274 | $this->maximumChannelCount = $frame->channelMax; |
||
| 275 | } |
||
| 276 | |||
| 277 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
| 278 | if ($frame->frameMax === self::UNLIMITED_FRAME_SIZE) { |
||
| 279 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
| 280 | } elseif ($frame->frameMax < self::MAXIMUM_FRAME_SIZE) { |
||
| 281 | $this->maximumFrameSize = $frame->frameMax; |
||
| 282 | } |
||
| 283 | |||
| 284 | $heartbeatInterval = 0; |
||
| 285 | if (!self::HEARTBEAT_DISABLED) { |
||
| 286 | $heartbeatInterval = $this->connectionOptions->getHeartbeatInterval(); |
||
| 287 | if (null === $heartbeatInterval) { |
||
| 288 | $heartbeatInterval = $frame->heartbeat; |
||
| 289 | } elseif ($frame->heartbeat < $heartbeatInterval) { |
||
| 290 | $heartbeatInterval = $frame->heartbeat; |
||
| 291 | } |
||
| 292 | } |
||
| 293 | |||
| 294 | $outputFrame = ConnectionTuneOkFrame::create( |
||
| 295 | $this->maximumChannelCount, |
||
| 296 | $this->maximumFrameSize, |
||
| 297 | $heartbeatInterval |
||
| 298 | ); |
||
| 299 | |||
| 300 | if ($outputFrame->heartbeat > 0) { |
||
| 301 | /** |
||
| 302 | * We need to set timeout value = ConnectionTuneFrame::heartbeat + 5 sec |
||
| 303 | */ |
||
| 304 | $timeout = $outputFrame->heartbeat + 5; |
||
| 305 | $this->setTimeout($timeout); |
||
| 306 | $this->connectionOptions->setHeartbeatInterval($outputFrame->heartbeat); |
||
| 307 | $this->connectionOptions->setConnectionTimeout($timeout); |
||
| 308 | } |
||
| 309 | |||
| 310 | $this->command($outputFrame); |
||
| 311 | |||
| 312 | $outputFrame = ConnectionOpenFrame::create( |
||
| 313 | $this->connectionOptions->getVhost() |
||
| 314 | ); |
||
| 315 | $this->command($outputFrame); |
||
| 316 | break; |
||
| 317 | |||
| 318 | case $frame instanceof ConnectionOpenOkFrame: |
||
| 319 | $this->isHandshaked = true; |
||
| 320 | $this->openChannel(function ($channel) { |
||
| 321 | $this->trigger(self::EVENT_ON_HANDSHAKE, $channel); |
||
| 322 | }); |
||
| 323 | break; |
||
| 324 | } |
||
| 325 | |||
| 326 | $this->checkFree(); |
||
| 327 | return; |
||
| 328 | } |
||
| 329 | |||
| 330 | switch (true) { |
||
| 331 | case $frame instanceof HeartbeatFrame: |
||
| 332 | $this->command($frame); |
||
| 333 | break; |
||
| 334 | case $frame instanceof ConnectionCloseFrame: |
||
| 335 | $this->trigger(self::EVENT_ON_CONNECTION_CLOSE, $frame->replyCode, $frame->replyText); |
||
| 336 | $this->close(); |
||
| 337 | return; |
||
| 338 | break; |
||
| 339 | default: |
||
| 340 | if (isset($frame->frameChannelId) |
||
| 341 | && $frame->frameChannelId > 0 |
||
| 342 | && array_key_exists($frame->frameChannelId, $this->channels)) { |
||
| 343 | /** @var Channel $channel */ |
||
| 344 | $channel = $this->channels[$frame->frameChannelId]; |
||
| 345 | $channel->trigger(get_class($frame), $frame); |
||
| 346 | break; // exit |
||
| 347 | } |
||
| 348 | |||
| 349 | $this->trigger(get_class($frame), $frame); |
||
| 350 | break; |
||
| 351 | } |
||
| 352 | |||
| 353 | if ($this->bev && $this->getInputLength() > 0) { |
||
| 354 | goto frame; |
||
| 355 | } |
||
| 356 | |||
| 357 | $this->checkFree(); |
||
| 358 | } |
||
| 359 | |||
| 489 |
Overly long lines are hard to read on any screen. Most code styles therefor impose a maximum limit on the number of characters in a line.