Conditions | 31 |
Paths | 1 |
Total Lines | 175 |
Code Lines | 109 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | <?php |
||
177 | protected function onRead() |
||
178 | { |
||
179 | if ($this->getInputLength() <= 0) { |
||
180 | return; |
||
181 | } |
||
182 | // set busy for connection |
||
183 | $this->busy = true; |
||
184 | |||
185 | /** |
||
186 | * 1) read Frame::HEADER_SIZE |
||
187 | * 2) get payload length from header |
||
188 | * 3) concatenate header and payload + 1 byte (for Constants::FRAME_END) into buffer |
||
189 | * 4) parse buffer |
||
190 | * 5) do stuff... |
||
191 | * 6) if bev contains more data go to 1 |
||
192 | */ |
||
193 | frame: |
||
194 | $header = $this->readExact(Frame::HEADER_SIZE); |
||
195 | $framePayloadSize = Binary::b2i(substr($header, Frame::HEADER_TYPE_SIZE + Frame::HEADER_CHANNEL_SIZE, Frame::HEADER_PAYLOAD_LENGTH_SIZE)); |
||
|
|||
196 | |||
197 | $buffer = $header . $this->readExact($framePayloadSize + 1); |
||
198 | |||
199 | $frame = $this->parser->feed($buffer); |
||
200 | if ($frame === null) { |
||
201 | return; |
||
202 | } |
||
203 | if ($this->debug) { |
||
204 | $this->log(sprintf('[AMQP] %s packet received', get_class($frame))); |
||
205 | } |
||
206 | |||
207 | if (!$this->isHandshaked) { |
||
208 | switch (true) { |
||
209 | case $frame instanceof ConnectionStartFrame: |
||
210 | if ($frame->versionMajor !== 0 || $frame->versionMinor !== 9) { |
||
211 | throw AMQPConnectionException::handshakeFailed( |
||
212 | $this->connectionOptions, |
||
213 | sprintf( |
||
214 | 'the broker reported an unexpected AMQP version (v%d.%d)', |
||
215 | $frame->versionMajor, |
||
216 | $frame->versionMinor |
||
217 | ) |
||
218 | ); |
||
219 | } |
||
220 | if (!preg_match('/\bAMQPLAIN\b/', $frame->mechanisms)) { |
||
221 | throw AMQPConnectionException::handshakeFailed( |
||
222 | $this->connectionOptions, |
||
223 | 'the AMQPLAIN authentication mechanism is not supported' |
||
224 | ); |
||
225 | } |
||
226 | |||
227 | $this->features = new Features(); |
||
228 | $properties = $frame->serverProperties; |
||
229 | if (isset($properties['product']) && 'RabbitMQ' === $properties['product']) { |
||
230 | $this->features->qosSizeLimits = false; |
||
231 | } |
||
232 | if (array_key_exists('capabilities', $properties)) { |
||
233 | if (array_key_exists('per_consumer_qos', $properties['capabilities'])) { |
||
234 | $this->features->perConsumerQos = (bool)$properties['capabilities']['per_consumer_qos']; |
||
235 | } |
||
236 | if (array_key_exists('exchange_exchange_bindings', $properties['capabilities'])) { |
||
237 | $this->features->exchangeToExchangeBindings = (bool)$properties['capabilities']['exchange_exchange_bindings']; |
||
238 | } |
||
239 | } |
||
240 | |||
241 | // Serialize credentials in "AMQPLAIN" format, which is essentially an |
||
242 | // AMQP table without the 4-byte size header ... |
||
243 | $user = $this->connectionOptions->getUsername(); |
||
244 | $pass = $this->connectionOptions->getPassword(); |
||
245 | |||
246 | $credentials = "\x05LOGINS" . pack('N', strlen($user)) . $user |
||
247 | . "\x08PASSWORDS" . pack('N', strlen($pass)) . $pass; |
||
248 | |||
249 | $this->command(ConnectionStartOkFrame::create( |
||
250 | [ |
||
251 | 'product' => $this->connectionOptions->getProductName(), |
||
252 | 'version' => $this->connectionOptions->getProductVersion(), |
||
253 | 'platform' => PackageInfo::AMQP_PLATFORM, |
||
254 | 'copyright' => PackageInfo::AMQP_COPYRIGHT, |
||
255 | 'information' => PackageInfo::AMQP_INFORMATION, |
||
256 | |||
257 | ], |
||
258 | 'AMQPLAIN', |
||
259 | $credentials |
||
260 | )); |
||
261 | break; |
||
262 | case $frame instanceof ConnectionTuneFrame: |
||
263 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
264 | if ($frame->channelMax === self::UNLIMITED_CHANNELS) { |
||
265 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
266 | } elseif ($frame->channelMax < self::MAXIMUM_CHANNELS) { |
||
267 | $this->maximumChannelCount = $frame->channelMax; |
||
268 | } |
||
269 | |||
270 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
271 | if ($frame->frameMax === self::UNLIMITED_FRAME_SIZE) { |
||
272 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
273 | } elseif ($frame->frameMax < self::MAXIMUM_FRAME_SIZE) { |
||
274 | $this->maximumFrameSize = $frame->frameMax; |
||
275 | } |
||
276 | |||
277 | $heartbeatInterval = 0; |
||
278 | if (!self::HEARTBEAT_DISABLED) { |
||
279 | $heartbeatInterval = $this->connectionOptions->getHeartbeatInterval(); |
||
280 | if (null === $heartbeatInterval) { |
||
281 | $heartbeatInterval = $frame->heartbeat; |
||
282 | } elseif ($frame->heartbeat < $heartbeatInterval) { |
||
283 | $heartbeatInterval = $frame->heartbeat; |
||
284 | } |
||
285 | } |
||
286 | |||
287 | $outputFrame = ConnectionTuneOkFrame::create( |
||
288 | $this->maximumChannelCount, |
||
289 | $this->maximumFrameSize, |
||
290 | $heartbeatInterval |
||
291 | ); |
||
292 | |||
293 | if ($outputFrame->heartbeat > 0) { |
||
294 | /** |
||
295 | * We need to set timeout value = ConnectionTuneFrame::heartbeat + 5 sec |
||
296 | */ |
||
297 | $timeout = $outputFrame->heartbeat + 5; |
||
298 | $this->setTimeout($timeout); |
||
299 | $this->connectionOptions->setHeartbeatInterval($outputFrame->heartbeat); |
||
300 | $this->connectionOptions->setConnectionTimeout($timeout); |
||
301 | } |
||
302 | |||
303 | $this->command($outputFrame); |
||
304 | |||
305 | $outputFrame = ConnectionOpenFrame::create( |
||
306 | $this->connectionOptions->getVhost() |
||
307 | ); |
||
308 | $this->command($outputFrame); |
||
309 | break; |
||
310 | |||
311 | case $frame instanceof ConnectionOpenOkFrame: |
||
312 | $this->isHandshaked = true; |
||
313 | $this->openChannel(function ($channel) { |
||
314 | $this->trigger(self::EVENT_ON_HANDSHAKE, $channel); |
||
315 | }); |
||
316 | break; |
||
317 | } |
||
318 | |||
319 | $this->checkFree(); |
||
320 | return; |
||
321 | } |
||
322 | |||
323 | switch (true) { |
||
324 | case $frame instanceof HeartbeatFrame: |
||
325 | $this->command($frame); |
||
326 | break; |
||
327 | case $frame instanceof ConnectionCloseFrame: |
||
328 | $this->trigger(self::EVENT_ON_CONNECTION_CLOSE, $frame->replyCode, $frame->replyText); |
||
329 | $this->close(); |
||
330 | return; |
||
331 | break; |
||
332 | default: |
||
333 | if (isset($frame->frameChannelId) |
||
334 | && $frame->frameChannelId > 0 |
||
335 | && array_key_exists($frame->frameChannelId, $this->channels)) { |
||
336 | /** @var Channel $channel */ |
||
337 | $channel = $this->channels[$frame->frameChannelId]; |
||
338 | $channel->trigger(get_class($frame), $frame); |
||
339 | break; // exit |
||
340 | } |
||
341 | |||
342 | $this->trigger(get_class($frame), $frame); |
||
343 | break; |
||
344 | } |
||
345 | |||
346 | if ($this->bev && $this->getInputLength() > 0) { |
||
347 | goto frame; |
||
348 | } |
||
349 | |||
350 | $this->checkFree(); |
||
351 | } |
||
352 | |||
482 |
Overly long lines are hard to read on any screen. Most code styles therefor impose a maximum limit on the number of characters in a line.