This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace PHPDaemon\Clients\AMQP; |
||
4 | |||
5 | use PHPDaemon\Clients\AMQP\Driver\ConnectionOptions; |
||
6 | use PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException; |
||
7 | use PHPDaemon\Clients\AMQP\Driver\Features; |
||
8 | use PHPDaemon\Clients\AMQP\Driver\PackageInfo; |
||
9 | use PHPDaemon\Clients\AMQP\Driver\Protocol\CommandInterface; |
||
10 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser\Frame; |
||
11 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser\Frame as FrameParser; |
||
12 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser\Table as TableParser; |
||
13 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionCloseFrame; |
||
14 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionOpenFrame; |
||
15 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionOpenOkFrame; |
||
16 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionStartFrame; |
||
17 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionStartOkFrame; |
||
18 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionTuneFrame; |
||
19 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionTuneOkFrame; |
||
20 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\HeartbeatFrame; |
||
21 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\OutgoingFrame; |
||
22 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Serializer\Frame as FrameSerializer; |
||
23 | use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Serializer\Table as TableSerializer; |
||
24 | use PHPDaemon\Network\ClientConnection; |
||
25 | use PHPDaemon\Utils\Binary; |
||
26 | |||
27 | /** |
||
28 | * Class Connection |
||
29 | * @author Aleksey I. Kuleshov YOU GLOBAL LIMITED |
||
30 | * @package PHPDaemon\Clients\AMQP |
||
31 | */ |
||
32 | class Connection extends ClientConnection implements CommandInterface |
||
33 | { |
||
34 | |||
35 | /** |
||
36 | * The AMQP protocol header is sent by the client before any frame-based |
||
37 | * communication takes place. It is the only data transferred that is not |
||
38 | * a frame. |
||
39 | */ |
||
40 | const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01"; |
||
41 | |||
42 | /** |
||
43 | * The maximum number of channels. |
||
44 | * |
||
45 | * AMQP channel ID is 2 bytes, but zero is reserved for connection-level |
||
46 | * communication. |
||
47 | */ |
||
48 | const MAXIMUM_CHANNELS = 0xffff - 1; |
||
49 | |||
50 | /** |
||
51 | * The maximum frame size the client supports. |
||
52 | * |
||
53 | * Note: RabbitMQ's default is 0x20000 (128 KB), our limit is higher to |
||
54 | * allow for some server-side configurability. |
||
55 | */ |
||
56 | const MAXIMUM_FRAME_SIZE = 0x80000; // 512 KB |
||
57 | |||
58 | /** |
||
59 | * The broker sends channelMax of zero in the tune frame if it does not |
||
60 | * impose a channel limit. |
||
61 | */ |
||
62 | const UNLIMITED_CHANNELS = 0; |
||
63 | |||
64 | /** |
||
65 | * The broker sends frameMax of zero in the tune frame if it does not impose |
||
66 | * a frame size limit. |
||
67 | */ |
||
68 | const UNLIMITED_FRAME_SIZE = 0; |
||
69 | |||
70 | /** |
||
71 | * The broker sends a heartbeat of zero in the tune frame if it does not use |
||
72 | * heartbeats. |
||
73 | */ |
||
74 | const HEARTBEAT_DISABLED = 0; |
||
75 | |||
76 | /** |
||
77 | * Event raised when protocol handshake ready |
||
78 | */ |
||
79 | const EVENT_ON_HANDSHAKE = 'event.amqp.connection.handshake'; |
||
80 | |||
81 | /** |
||
82 | * Event raised when connection close frame incoming |
||
83 | */ |
||
84 | |||
85 | const EVENT_ON_CONNECTION_CLOSE = 'event.amqp.connection.close'; |
||
86 | |||
87 | /** |
||
88 | * @var FrameParser |
||
89 | */ |
||
90 | protected $parser; |
||
91 | |||
92 | /** |
||
93 | * @var FrameSerializer |
||
94 | */ |
||
95 | protected $serializer; |
||
96 | |||
97 | /** |
||
98 | * @var ConnectionOptions |
||
99 | */ |
||
100 | protected $connectionOptions; |
||
101 | |||
102 | /** |
||
103 | * @var bool |
||
104 | */ |
||
105 | protected $isHandshaked = false; |
||
106 | |||
107 | /** |
||
108 | * The broker's supported features. |
||
109 | * @var Features |
||
110 | */ |
||
111 | private $features; |
||
112 | |||
113 | /** |
||
114 | * @var int |
||
115 | */ |
||
116 | private $maximumChannelCount; |
||
117 | |||
118 | /** |
||
119 | * @var int |
||
120 | */ |
||
121 | private $maximumFrameSize; |
||
122 | |||
123 | /** |
||
124 | * @var array |
||
125 | */ |
||
126 | private $channels = []; |
||
127 | |||
128 | /** |
||
129 | * @var int |
||
130 | */ |
||
131 | private $nextChannelId = 1; |
||
132 | |||
133 | /** |
||
134 | * @var bool |
||
135 | */ |
||
136 | private $debug = false; |
||
137 | |||
138 | /** |
||
139 | * |
||
140 | */ |
||
141 | protected function init() |
||
142 | { |
||
143 | $this->parser = new FrameParser(new TableParser()); |
||
144 | $this->serializer = new FrameSerializer(new TableSerializer()); |
||
145 | |||
146 | $this->connectionOptions = new ConnectionOptions( |
||
147 | $this->pool->config->host->value, |
||
148 | $this->pool->config->port->value, |
||
149 | $this->pool->config->username->value, |
||
150 | $this->pool->config->password->value, |
||
151 | $this->pool->config->vhost->value |
||
152 | ); |
||
153 | |||
154 | $this->debug = isset($this->pool->config->debug); |
||
155 | } |
||
156 | |||
157 | /** |
||
158 | * |
||
159 | */ |
||
160 | public function onReady() |
||
161 | { |
||
162 | if ($this->isHandshaked) { |
||
163 | parent::onReady(); |
||
164 | } |
||
165 | |||
166 | $this->write(self::PROTOCOL_HEADER); |
||
167 | |||
168 | parent::onReady(); |
||
169 | } |
||
170 | |||
171 | /** |
||
172 | * |
||
173 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
174 | * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException |
||
175 | * @throws \InvalidArgumentException |
||
176 | */ |
||
177 | protected function onRead() |
||
178 | { |
||
179 | if ($this->getInputLength() <= 0) { |
||
180 | return; |
||
181 | } |
||
182 | // set busy for connection |
||
183 | $this->busy = true; |
||
184 | |||
185 | /** |
||
186 | * 1) read Frame::HEADER_SIZE |
||
187 | * 2) get payload length from header |
||
188 | * 3) concatenate header and payload + 1 byte (for Constants::FRAME_END) into buffer |
||
189 | * 4) parse buffer |
||
190 | * 5) do stuff... |
||
191 | * 6) if bev contains more data go to 1 |
||
192 | */ |
||
193 | frame: |
||
194 | $header = $this->readExact(Frame::HEADER_SIZE); |
||
195 | if ($header === false) { |
||
196 | return; |
||
197 | } |
||
198 | $framePayloadSize = Binary::b2i(substr($header, Frame::HEADER_TYPE_SIZE + Frame::HEADER_CHANNEL_SIZE, Frame::HEADER_PAYLOAD_LENGTH_SIZE)); |
||
199 | |||
200 | $payload = $this->readExact($framePayloadSize + 1); |
||
201 | if ($payload === false) { |
||
202 | return; |
||
203 | } |
||
204 | $buffer = $header . $payload; |
||
205 | |||
206 | $frame = $this->parser->feed($buffer); |
||
207 | if ($frame === null) { |
||
208 | return; |
||
209 | } |
||
210 | if ($this->debug) { |
||
211 | $this->log(sprintf('[AMQP] %s packet received', get_class($frame))); |
||
212 | } |
||
213 | |||
214 | if (!$this->isHandshaked) { |
||
215 | switch (true) { |
||
216 | case $frame instanceof ConnectionStartFrame: |
||
217 | if ($frame->versionMajor !== 0 || $frame->versionMinor !== 9) { |
||
218 | throw AMQPConnectionException::handshakeFailed( |
||
219 | $this->connectionOptions, |
||
220 | sprintf( |
||
221 | 'the broker reported an unexpected AMQP version (v%d.%d)', |
||
222 | $frame->versionMajor, |
||
223 | $frame->versionMinor |
||
224 | ) |
||
225 | ); |
||
226 | } |
||
227 | if (!preg_match('/\bAMQPLAIN\b/', $frame->mechanisms)) { |
||
228 | throw AMQPConnectionException::handshakeFailed( |
||
229 | $this->connectionOptions, |
||
230 | 'the AMQPLAIN authentication mechanism is not supported' |
||
231 | ); |
||
232 | } |
||
233 | |||
234 | $this->features = new Features(); |
||
235 | $properties = $frame->serverProperties; |
||
236 | if (isset($properties['product']) && 'RabbitMQ' === $properties['product']) { |
||
237 | $this->features->qosSizeLimits = false; |
||
238 | } |
||
239 | if (array_key_exists('capabilities', $properties)) { |
||
240 | if (array_key_exists('per_consumer_qos', $properties['capabilities'])) { |
||
241 | $this->features->perConsumerQos = (bool)$properties['capabilities']['per_consumer_qos']; |
||
242 | } |
||
243 | if (array_key_exists('exchange_exchange_bindings', $properties['capabilities'])) { |
||
244 | $this->features->exchangeToExchangeBindings = (bool)$properties['capabilities']['exchange_exchange_bindings']; |
||
245 | } |
||
246 | } |
||
247 | |||
248 | // Serialize credentials in "AMQPLAIN" format, which is essentially an |
||
249 | // AMQP table without the 4-byte size header ... |
||
250 | $user = $this->connectionOptions->getUsername(); |
||
251 | $pass = $this->connectionOptions->getPassword(); |
||
252 | |||
253 | $credentials = "\x05LOGINS" . pack('N', strlen($user)) . $user |
||
254 | . "\x08PASSWORDS" . pack('N', strlen($pass)) . $pass; |
||
255 | |||
256 | $this->command(ConnectionStartOkFrame::create( |
||
257 | [ |
||
258 | 'product' => $this->connectionOptions->getProductName(), |
||
259 | 'version' => $this->connectionOptions->getProductVersion(), |
||
260 | 'platform' => PackageInfo::AMQP_PLATFORM, |
||
261 | 'copyright' => PackageInfo::AMQP_COPYRIGHT, |
||
262 | 'information' => PackageInfo::AMQP_INFORMATION, |
||
263 | |||
264 | ], |
||
265 | 'AMQPLAIN', |
||
266 | $credentials |
||
267 | )); |
||
268 | break; |
||
269 | case $frame instanceof ConnectionTuneFrame: |
||
270 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
271 | if ($frame->channelMax === self::UNLIMITED_CHANNELS) { |
||
272 | $this->maximumChannelCount = self::MAXIMUM_CHANNELS; |
||
273 | } elseif ($frame->channelMax < self::MAXIMUM_CHANNELS) { |
||
274 | $this->maximumChannelCount = $frame->channelMax; |
||
275 | } |
||
276 | |||
277 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
278 | if ($frame->frameMax === self::UNLIMITED_FRAME_SIZE) { |
||
279 | $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE; |
||
280 | } elseif ($frame->frameMax < self::MAXIMUM_FRAME_SIZE) { |
||
281 | $this->maximumFrameSize = $frame->frameMax; |
||
282 | } |
||
283 | |||
284 | $heartbeatInterval = 0; |
||
285 | if (!self::HEARTBEAT_DISABLED) { |
||
286 | $heartbeatInterval = $this->connectionOptions->getHeartbeatInterval(); |
||
287 | if (null === $heartbeatInterval) { |
||
288 | $heartbeatInterval = $frame->heartbeat; |
||
289 | } elseif ($frame->heartbeat < $heartbeatInterval) { |
||
290 | $heartbeatInterval = $frame->heartbeat; |
||
291 | } |
||
292 | } |
||
293 | |||
294 | $outputFrame = ConnectionTuneOkFrame::create( |
||
295 | $this->maximumChannelCount, |
||
296 | $this->maximumFrameSize, |
||
297 | $heartbeatInterval |
||
298 | ); |
||
299 | |||
300 | if ($outputFrame->heartbeat > 0) { |
||
301 | /** |
||
302 | * We need to set timeout value = ConnectionTuneFrame::heartbeat + 5 sec |
||
303 | */ |
||
304 | $timeout = $outputFrame->heartbeat + 5; |
||
305 | $this->setTimeout($timeout); |
||
306 | $this->connectionOptions->setHeartbeatInterval($outputFrame->heartbeat); |
||
307 | $this->connectionOptions->setConnectionTimeout($timeout); |
||
308 | } |
||
309 | |||
310 | $this->command($outputFrame); |
||
311 | |||
312 | $outputFrame = ConnectionOpenFrame::create( |
||
313 | $this->connectionOptions->getVhost() |
||
314 | ); |
||
315 | $this->command($outputFrame); |
||
316 | break; |
||
317 | |||
318 | case $frame instanceof ConnectionOpenOkFrame: |
||
319 | $this->isHandshaked = true; |
||
320 | $this->openChannel(function ($channel) { |
||
321 | $this->trigger(self::EVENT_ON_HANDSHAKE, $channel); |
||
322 | }); |
||
323 | break; |
||
324 | } |
||
325 | |||
326 | $this->checkFree(); |
||
327 | return; |
||
328 | } |
||
329 | |||
330 | switch (true) { |
||
331 | case $frame instanceof HeartbeatFrame: |
||
332 | $this->command($frame); |
||
333 | break; |
||
334 | case $frame instanceof ConnectionCloseFrame: |
||
335 | $this->trigger(self::EVENT_ON_CONNECTION_CLOSE, $frame->replyCode, $frame->replyText); |
||
336 | $this->close(); |
||
337 | return; |
||
338 | break; |
||
0 ignored issues
–
show
|
|||
339 | default: |
||
340 | if (isset($frame->frameChannelId) |
||
341 | && $frame->frameChannelId > 0 |
||
342 | && array_key_exists($frame->frameChannelId, $this->channels)) { |
||
343 | /** @var Channel $channel */ |
||
344 | $channel = $this->channels[$frame->frameChannelId]; |
||
345 | $channel->trigger(get_class($frame), $frame); |
||
346 | break; // exit |
||
347 | } |
||
348 | |||
349 | $this->trigger(get_class($frame), $frame); |
||
350 | break; |
||
351 | } |
||
352 | |||
353 | if ($this->bev && $this->getInputLength() > 0) { |
||
354 | goto frame; |
||
355 | } |
||
356 | |||
357 | $this->checkFree(); |
||
358 | } |
||
359 | |||
360 | /** |
||
361 | * @param int $id |
||
362 | * @param callable $callback |
||
363 | * @throws \InvalidArgumentException |
||
364 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
365 | * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException |
||
366 | */ |
||
367 | public function getChannel(callable $callback, $id = 1) |
||
368 | { |
||
369 | if (count($this->channels) === 0) { |
||
370 | $this->openChannel($callback); |
||
371 | return; |
||
372 | } |
||
373 | |||
374 | if (is_callable($callback)) { |
||
375 | $callback($this->channels[$id]); |
||
376 | } |
||
377 | } |
||
378 | |||
379 | /** |
||
380 | * @param callable|null $callback |
||
381 | * @throws \InvalidArgumentException |
||
382 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
383 | * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException |
||
384 | */ |
||
385 | public function openChannel(callable $callback = null) |
||
386 | { |
||
387 | new Channel($this, $callback); |
||
388 | } |
||
389 | |||
390 | /** |
||
391 | * @return int |
||
392 | * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException |
||
393 | */ |
||
394 | public function findChannelId() |
||
395 | { |
||
396 | // first check in range [next, max] ... |
||
397 | View Code Duplication | for ( |
|
0 ignored issues
–
show
This code seems to be duplicated across your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
398 | $channelId = $this->nextChannelId; |
||
399 | $channelId <= $this->maximumChannelCount; |
||
400 | ++$channelId |
||
401 | ) { |
||
402 | if (!isset($this->channels[$channelId])) { |
||
403 | $this->nextChannelId = $channelId + 1; |
||
404 | |||
405 | return $channelId; |
||
406 | } |
||
407 | } |
||
408 | |||
409 | // then check in range [min, next) ... |
||
410 | View Code Duplication | for ( |
|
0 ignored issues
–
show
This code seems to be duplicated across your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
411 | $channelId = 1; |
||
412 | $channelId < $this->nextChannelId; |
||
413 | ++$channelId |
||
414 | ) { |
||
415 | if (!isset($this->channels[$channelId])) { |
||
416 | $this->nextChannelId = $channelId + 1; |
||
417 | |||
418 | return $channelId; |
||
419 | } |
||
420 | } |
||
421 | |||
422 | throw new AMQPConnectionException('No available channels'); |
||
423 | } |
||
424 | |||
425 | /** |
||
426 | * @param OutgoingFrame $frame |
||
427 | * @param callable|null $callback |
||
428 | * @return bool |
||
429 | * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException |
||
430 | * @throws \InvalidArgumentException |
||
431 | */ |
||
432 | public function command(OutgoingFrame $frame, callable $callback = null) |
||
433 | { |
||
434 | if ($callback) { |
||
435 | $this->onResponse($callback); |
||
436 | } |
||
437 | $serializedFrame = $this->serializer->serialize($frame); |
||
438 | return $this->write($serializedFrame); |
||
439 | } |
||
440 | |||
441 | |||
442 | public function addChannel($id, Channel $channel) |
||
443 | { |
||
444 | $this->channels[$id] = $channel; |
||
445 | $this->nextChannelId = max(array_keys($this->channels)) + 1; |
||
446 | return $this; |
||
447 | } |
||
448 | |||
449 | /** |
||
450 | * @return Features |
||
451 | */ |
||
452 | public function getFeatures() |
||
453 | { |
||
454 | return $this->features; |
||
455 | } |
||
456 | |||
457 | /** |
||
458 | * @return ConnectionOptions |
||
459 | */ |
||
460 | public function getConnectionOptions() |
||
461 | { |
||
462 | return $this->connectionOptions; |
||
463 | } |
||
464 | |||
465 | /** |
||
466 | * @return int |
||
467 | */ |
||
468 | public function getMaximumChannelCount() |
||
469 | { |
||
470 | return $this->maximumChannelCount; |
||
471 | } |
||
472 | |||
473 | /** |
||
474 | * @return int |
||
475 | */ |
||
476 | public function getMaximumFrameSize() |
||
477 | { |
||
478 | return $this->maximumFrameSize; |
||
479 | } |
||
480 | |||
481 | /** |
||
482 | * @return bool |
||
483 | */ |
||
484 | public function isHandshaked() |
||
485 | { |
||
486 | return $this->isHandshaked; |
||
487 | } |
||
488 | } |
||
489 |
The break statement is not necessary if it is preceded for example by a return statement:
If you would like to keep this construct to be consistent with other case statements, you can safely mark this issue as a false-positive.