Conditions | 33 |
Paths | 1 |
Total Lines | 182 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | <?php |
||
177 | protected function onRead() |
||
178 | { |
||
179 | if ($this->getInputLength() <= 0) { |
||
180 | return; |
||
181 | } |
||
182 | // set busy for connection |
||
183 | $this->busy = true; |
||
184 | |||
185 | /** |
||
186 | * 1) read Frame::HEADER_SIZE |
||
187 | * 2) get payload length from header |
||
188 | * 3) concatenate header and payload + 1 byte (for Constants::FRAME_END) into buffer |
||
189 | * 4) parse buffer |
||
190 | * 5) do stuff... |
||
191 | * 6) if bev contains more data go to 1 |
||
192 | */ |
||
193 | frame: |
||
194 | $header = $this->readExact(Frame::HEADER_SIZE); |
||
195 | if ($header === false) { |
||
196 | return; |
||
197 | } |
||
198 | $framePayloadSize = Binary::b2i(substr($header, Frame::HEADER_TYPE_SIZE + Frame::HEADER_CHANNEL_SIZE, Frame::HEADER_PAYLOAD_LENGTH_SIZE)); |
||
199 | |||
200 | $payload = $this->readExact($framePayloadSize + 1); |
||
201 | if ($payload === false) { |
||
202 | return; |
||
203 | } |
||
204 | $buffer = $header . $payload; |
||
205 | |||
206 | $frame = $this->parser->feed($buffer); |
||
207 | if ($frame === null) { |
||
208 | return; |
||
209 | } |
||
210 | if ($this->debug) { |
||
211 | $this->log(sprintf('[AMQP] %s packet received', get_class($frame))); |
||
212 | } |
||
213 | |||
214 | if (!$this->isHandshaked) { |
||
215 | switch (true) { |
||
216 | case $frame instanceof ConnectionStartFrame: |
||
217 | if ($frame->versionMajor !== 0 || $frame->versionMinor !== 9) { |
||
218 | throw AMQPConnectionException::handshakeFailed( |
||
219 | $this->connectionOptions, |
||
220 | sprintf( |
||
221 | 'the broker reported an unexpected AMQP version (v%d.%d)', |
||
222 | $frame->versionMajor, |
||
223 | $frame->versionMinor |
||
224 | ) |
||
225 | ); |
||
226 | } |
||
227 | if (!preg_match('/\bAMQPLAIN\b/', $frame->mechanisms)) { |
||
228 | throw AMQPConnectionException::handshakeFailed( |
||
229 | $this->connectionOptions, |
||
230 | 'the AMQPLAIN authentication mechanism is not supported' |
||
231 | ); |
||
232 | } |
||
233 | |||
234 | $this->features = new Features(); |
||
235 | $properties = $frame->serverProperties; |
||
236 | if (isset($properties['product']) && 'RabbitMQ' === $properties['product']) { |
||
237 | $this->features->qosSizeLimits = false; |
||
238 | } |
||
239 | if (array_key_exists('capabilities', $properties)) { |
||
240 | if (array_key_exists('per_consumer_qos', $properties['capabilities'])) { |
||
241 | $this->features->perConsumerQos = (bool)$properties['capabilities']['per_consumer_qos']; |
||
242 | } |
||
243 | if (array_key_exists('exchange_exchange_bindings', $properties['capabilities'])) { |
||
244 | $this->features->exchangeToExchangeBindings = (bool)$properties['capabilities']['exchange_exchange_bindings']; |
||
245 | } |
||
246 | } |
||
247 | |||
248 | // Serialize credentials in "AMQPLAIN" format, which is essentially an |
||
249 | // AMQP table without the 4-byte size header ... |
||
250 | $user = $this->connectionOptions->getUsername(); |
||
251 | $pass = $this->connectionOptions->getPassword(); |
||
252 | |||
253 | $credentials = "\x05LOGINS" . pack('N', strlen($user)) . $user |
||
254 | . "\x08PASSWORDS" . pack('N', strlen($pass)) . $pass; |
||
255 | |||
256 | $this->command(ConnectionStartOkFrame::create( |
||
257 | [ |
||
258 | 'product' => $this->connectionOptions->getProductName(), |
||
259 | 'version' => $this->connectionOptions->getProductVersion(), |
||
260 | 'platform' => PackageInfo::AMQP_PLATFORM, |
||
261 | 'copyright' => PackageInfo::AMQP_COPYRIGHT, |
||
262 | 'information' => PackageInfo::AMQP_INFORMATION, |
||
263 | |||
264 | ], |
||
265 | 'AMQPLAIN', |
||
266 | $credentials |
||
267 | )); |
||
268 | break; |
||
269 | case $frame instanceof ConnectionTuneFrame: |
||
270 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
271 | if ($frame->channelMax === self::UNLIMITED_CHANNELS) { |
||
272 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
273 | } elseif ($frame->channelMax < self::MAXIMUM_CHANNELS) { |
||
274 | $this->maximumChannelCount = $frame->channelMax; |
||
275 | } |
||
276 | |||
277 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
278 | if ($frame->frameMax === self::UNLIMITED_FRAME_SIZE) { |
||
279 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
280 | } elseif ($frame->frameMax < self::MAXIMUM_FRAME_SIZE) { |
||
281 | $this->maximumFrameSize = $frame->frameMax; |
||
282 | } |
||
283 | |||
284 | $heartbeatInterval = 0; |
||
285 | if (!self::HEARTBEAT_DISABLED) { |
||
286 | $heartbeatInterval = $this->connectionOptions->getHeartbeatInterval(); |
||
287 | if (null === $heartbeatInterval) { |
||
288 | $heartbeatInterval = $frame->heartbeat; |
||
289 | } elseif ($frame->heartbeat < $heartbeatInterval) { |
||
290 | $heartbeatInterval = $frame->heartbeat; |
||
291 | } |
||
292 | } |
||
293 | |||
294 | $outputFrame = ConnectionTuneOkFrame::create( |
||
295 | $this->maximumChannelCount, |
||
296 | $this->maximumFrameSize, |
||
297 | $heartbeatInterval |
||
298 | ); |
||
299 | |||
300 | if ($outputFrame->heartbeat > 0) { |
||
301 | /** |
||
302 | * We need to set timeout value = ConnectionTuneFrame::heartbeat + 5 sec |
||
303 | */ |
||
304 | $timeout = $outputFrame->heartbeat + 5; |
||
305 | $this->setTimeout($timeout); |
||
306 | $this->connectionOptions->setHeartbeatInterval($outputFrame->heartbeat); |
||
307 | $this->connectionOptions->setConnectionTimeout($timeout); |
||
308 | } |
||
309 | |||
310 | $this->command($outputFrame); |
||
311 | |||
312 | $outputFrame = ConnectionOpenFrame::create( |
||
313 | $this->connectionOptions->getVhost() |
||
314 | ); |
||
315 | $this->command($outputFrame); |
||
316 | break; |
||
317 | |||
318 | case $frame instanceof ConnectionOpenOkFrame: |
||
319 | $this->isHandshaked = true; |
||
320 | $this->openChannel(function ($channel) { |
||
321 | $this->trigger(self::EVENT_ON_HANDSHAKE, $channel); |
||
322 | }); |
||
323 | break; |
||
324 | } |
||
325 | |||
326 | $this->checkFree(); |
||
327 | return; |
||
328 | } |
||
329 | |||
330 | switch (true) { |
||
331 | case $frame instanceof HeartbeatFrame: |
||
332 | $this->command($frame); |
||
333 | break; |
||
334 | case $frame instanceof ConnectionCloseFrame: |
||
335 | $this->trigger(self::EVENT_ON_CONNECTION_CLOSE, $frame->replyCode, $frame->replyText); |
||
336 | $this->close(); |
||
337 | return; |
||
338 | break; |
||
|
|||
339 | default: |
||
340 | if (isset($frame->frameChannelId) |
||
341 | && $frame->frameChannelId > 0 |
||
342 | && array_key_exists($frame->frameChannelId, $this->channels)) { |
||
343 | /** @var Channel $channel */ |
||
344 | $channel = $this->channels[$frame->frameChannelId]; |
||
345 | $channel->trigger(get_class($frame), $frame); |
||
346 | break; // exit |
||
347 | } |
||
348 | |||
349 | $this->trigger(get_class($frame), $frame); |
||
350 | break; |
||
351 | } |
||
352 | |||
353 | if ($this->bev && $this->getInputLength() > 0) { |
||
354 | goto frame; |
||
355 | } |
||
356 | |||
357 | $this->checkFree(); |
||
358 | } |
||
359 | |||
489 |
The break statement is not necessary if it is preceded for example by a return statement:
If you would like to keep this construct to be consistent with other case statements, you can safely mark this issue as a false-positive.