These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | namespace PhpAmqpLib\Connection; |
||
3 | |||
4 | use PhpAmqpLib\Channel\AbstractChannel; |
||
5 | use PhpAmqpLib\Channel\AMQPChannel; |
||
6 | use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
||
7 | use PhpAmqpLib\Exception\AMQPHeartbeatMissedException; |
||
8 | use PhpAmqpLib\Exception\AMQPInvalidFrameException; |
||
9 | use PhpAmqpLib\Exception\AMQPIOException; |
||
10 | use PhpAmqpLib\Exception\AMQPNoDataException; |
||
11 | use PhpAmqpLib\Exception\AMQPProtocolConnectionException; |
||
12 | use PhpAmqpLib\Exception\AMQPRuntimeException; |
||
13 | use PhpAmqpLib\Exception\AMQPSocketException; |
||
14 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||
15 | use PhpAmqpLib\Wire\AMQPReader; |
||
16 | use PhpAmqpLib\Wire\AMQPTable; |
||
17 | use PhpAmqpLib\Wire\AMQPWriter; |
||
18 | use PhpAmqpLib\Wire\IO\AbstractIO; |
||
19 | |||
20 | class AbstractConnection extends AbstractChannel |
||
21 | { |
||
22 | /** |
||
23 | * @var array |
||
24 | * @internal |
||
25 | */ |
||
26 | public static $LIBRARY_PROPERTIES = array( |
||
27 | 'product' => array('S', 'AMQPLib'), |
||
28 | 'platform' => array('S', 'PHP'), |
||
29 | 'version' => array('S', '2.9'), |
||
30 | 'information' => array('S', ''), |
||
31 | 'copyright' => array('S', ''), |
||
32 | 'capabilities' => array( |
||
33 | 'F', |
||
34 | array( |
||
35 | 'authentication_failure_close' => array('t', true), |
||
36 | 'publisher_confirms' => array('t', true), |
||
37 | 'consumer_cancel_notify' => array('t', true), |
||
38 | 'exchange_exchange_bindings' => array('t', true), |
||
39 | 'basic.nack' => array('t', true), |
||
40 | 'connection.blocked' => array('t', true) |
||
41 | ) |
||
42 | ) |
||
43 | ); |
||
44 | |||
45 | /** |
||
46 | * @var AMQPChannel[] |
||
47 | * @internal |
||
48 | */ |
||
49 | public $channels = array(); |
||
50 | |||
51 | /** @var int */ |
||
52 | protected $version_major; |
||
53 | |||
54 | /** @var int */ |
||
55 | protected $version_minor; |
||
56 | |||
57 | /** @var array */ |
||
58 | protected $server_properties; |
||
59 | |||
60 | /** @var array */ |
||
61 | protected $mechanisms; |
||
62 | |||
63 | /** @var array */ |
||
64 | protected $locales; |
||
65 | |||
66 | /** @var bool */ |
||
67 | protected $wait_tune_ok; |
||
68 | |||
69 | /** @var string */ |
||
70 | protected $known_hosts; |
||
71 | |||
72 | /** @var AMQPReader */ |
||
73 | protected $input; |
||
74 | |||
75 | /** @var string */ |
||
76 | protected $vhost; |
||
77 | |||
78 | /** @var bool */ |
||
79 | protected $insist; |
||
80 | |||
81 | /** @var string */ |
||
82 | protected $login_method; |
||
83 | |||
84 | /** @var string */ |
||
85 | protected $login_response; |
||
86 | |||
87 | /** @var string */ |
||
88 | protected $locale; |
||
89 | |||
90 | /** @var int */ |
||
91 | protected $heartbeat; |
||
92 | |||
93 | /** @var float */ |
||
94 | protected $last_frame; |
||
95 | |||
96 | /** @var int */ |
||
97 | protected $channel_max = 65535; |
||
98 | |||
99 | /** @var int */ |
||
100 | protected $frame_max = 131072; |
||
101 | |||
102 | /** @var array Constructor parameters for clone */ |
||
103 | protected $construct_params; |
||
104 | |||
105 | /** @var bool Close the connection in destructor */ |
||
106 | protected $close_on_destruct = true; |
||
107 | |||
108 | /** @var bool Maintain connection status */ |
||
109 | protected $is_connected = false; |
||
110 | |||
111 | /** @var \PhpAmqpLib\Wire\IO\AbstractIO */ |
||
112 | protected $io; |
||
113 | |||
114 | /** @var \PhpAmqpLib\Wire\AMQPReader */ |
||
115 | protected $wait_frame_reader; |
||
116 | |||
117 | /** @var callable Handles connection blocking from the server */ |
||
118 | private $connection_block_handler; |
||
119 | |||
120 | /** @var callable Handles connection unblocking from the server */ |
||
121 | private $connection_unblock_handler; |
||
122 | |||
123 | /** @var int Connection timeout value*/ |
||
124 | protected $connection_timeout ; |
||
125 | |||
126 | /** |
||
127 | * Circular buffer to speed up prepare_content(). |
||
128 | * Max size limited by $prepare_content_cache_max_size. |
||
129 | * |
||
130 | * @var array |
||
131 | * @see prepare_content() |
||
132 | */ |
||
133 | private $prepare_content_cache = array(); |
||
134 | |||
135 | /** @var int Maximal size of $prepare_content_cache */ |
||
136 | private $prepare_content_cache_max_size = 100; |
||
137 | |||
138 | /** |
||
139 | * Maximum time to wait for channel operations, in seconds |
||
140 | * @var float $channel_rpc_timeout |
||
141 | */ |
||
142 | private $channel_rpc_timeout; |
||
143 | |||
144 | /** |
||
145 | * @param string $user |
||
146 | * @param string $password |
||
147 | * @param string $vhost |
||
148 | * @param bool $insist |
||
149 | * @param string $login_method |
||
150 | * @param null $login_response |
||
151 | * @param string $locale |
||
152 | * @param AbstractIO $io |
||
153 | * @param int $heartbeat |
||
154 | * @param int $connection_timeout |
||
155 | * @param float $channel_rpc_timeout |
||
156 | * @throws \Exception |
||
157 | */ |
||
158 | 186 | public function __construct( |
|
159 | $user, |
||
160 | $password, |
||
161 | $vhost = '/', |
||
162 | $insist = false, |
||
163 | $login_method = 'AMQPLAIN', |
||
164 | $login_response = null, |
||
165 | $locale = 'en_US', |
||
166 | AbstractIO $io, |
||
167 | $heartbeat = 0, |
||
168 | $connection_timeout = 0, |
||
169 | $channel_rpc_timeout = 0.0 |
||
170 | ) { |
||
171 | // save the params for the use of __clone |
||
172 | 186 | $this->construct_params = func_get_args(); |
|
173 | |||
174 | 186 | $this->wait_frame_reader = new AMQPReader(null); |
|
175 | 186 | $this->vhost = $vhost; |
|
176 | 186 | $this->insist = $insist; |
|
177 | 186 | $this->login_method = $login_method; |
|
178 | 186 | $this->login_response = $login_response; |
|
179 | 186 | $this->locale = $locale; |
|
180 | 186 | $this->io = $io; |
|
181 | 186 | $this->heartbeat = $heartbeat; |
|
182 | 186 | $this->connection_timeout = $connection_timeout; |
|
183 | 186 | $this->channel_rpc_timeout = $channel_rpc_timeout; |
|
184 | |||
185 | 186 | if ($user && $password) { |
|
186 | 186 | $this->login_response = new AMQPWriter(); |
|
187 | 186 | $this->login_response->write_table(array( |
|
188 | 186 | 'LOGIN' => array('S', $user), |
|
189 | 186 | 'PASSWORD' => array('S', $password) |
|
190 | 93 | )); |
|
191 | |||
192 | // Skip the length |
||
193 | 186 | $responseValue = $this->login_response->getvalue(); |
|
194 | 186 | $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII'); |
|
195 | |||
196 | 93 | } else { |
|
197 | $this->login_response = null; |
||
198 | } |
||
199 | |||
200 | // Lazy Connection waits on connecting |
||
201 | 186 | if ($this->connectOnConstruct()) { |
|
202 | 162 | $this->connect(); |
|
203 | 78 | } |
|
204 | 180 | } |
|
205 | |||
206 | /** |
||
207 | * Connects to the AMQP server |
||
208 | */ |
||
209 | 186 | protected function connect() |
|
210 | { |
||
211 | try { |
||
212 | // Loop until we connect |
||
213 | 186 | while (!$this->isConnected()) { |
|
214 | // Assume we will connect, until we dont |
||
215 | 186 | $this->setIsConnected(true); |
|
216 | |||
217 | // Connect the socket |
||
218 | 186 | $this->io->connect(); |
|
219 | |||
220 | 180 | $this->channels = array(); |
|
221 | // The connection object itself is treated as channel 0 |
||
222 | 180 | parent::__construct($this, 0); |
|
223 | |||
224 | 180 | $this->input = new AMQPReader(null, $this->io); |
|
225 | |||
226 | 180 | $this->write($this->amqp_protocol_header); |
|
227 | 180 | // assume frame was sent successfully, used in $this->wait_channel() |
|
228 | 180 | $this->last_frame = microtime(true); |
|
229 | 180 | $this->wait(array($this->waitHelper->get_wait('connection.start')),false,$this->connection_timeout); |
|
230 | 180 | $this->x_start_ok( |
|
231 | 180 | $this->getLibraryProperties(), |
|
232 | 180 | $this->login_method, |
|
233 | 90 | $this->login_response, |
|
234 | $this->locale |
||
235 | 180 | ); |
|
236 | 180 | ||
237 | 180 | $this->wait_tune_ok = true; |
|
238 | 180 | while ($this->wait_tune_ok) { |
|
239 | 180 | $this->wait(array( |
|
240 | 90 | $this->waitHelper->get_wait('connection.secure'), |
|
241 | 90 | $this->waitHelper->get_wait('connection.tune') |
|
242 | )); |
||
243 | 180 | } |
|
244 | 180 | ||
245 | $host = $this->x_open($this->vhost, '', $this->insist); |
||
246 | 180 | if (!$host) { |
|
247 | 180 | //Reconnected |
|
248 | $this->io->reenableHeartbeat(); |
||
249 | return null; // we weren't redirected |
||
250 | } |
||
251 | |||
252 | $this->setIsConnected(false); |
||
253 | $this->closeChannels(); |
||
254 | |||
255 | // we were redirected, close the socket, loop and try again |
||
256 | $this->close_socket(); |
||
257 | 18 | } |
|
258 | |||
259 | 6 | } catch (\Exception $e) { |
|
260 | 6 | // Something went wrong, set the connection status |
|
261 | 6 | $this->setIsConnected(false); |
|
262 | 6 | $this->closeChannels(); |
|
263 | 6 | $this->close_input(); |
|
264 | $this->close_socket(); |
||
265 | 24 | throw $e; // Rethrow exception |
|
266 | } |
||
267 | } |
||
268 | |||
269 | /** |
||
270 | * Reconnects using the original connection settings. |
||
271 | 36 | * This will not recreate any channels that were established previously |
|
272 | */ |
||
273 | public function reconnect() |
||
274 | 36 | { |
|
275 | // Try to close the AMQP connection |
||
276 | 36 | $this->safeClose(); |
|
277 | 36 | // Reconnect the socket/stream then AMQP |
|
278 | 36 | $this->io->close(); |
|
279 | 36 | $this->setIsConnected(false); // getIO can initiate the connection setting via LazyConnection, set it here to be sure |
|
280 | $this->connect(); |
||
281 | } |
||
282 | |||
283 | /** |
||
284 | * Cloning will use the old properties to make a new connection to the same server |
||
285 | */ |
||
286 | public function __clone() |
||
287 | { |
||
288 | call_user_func_array(array($this, '__construct'), $this->construct_params); |
||
289 | 12 | } |
|
290 | |||
291 | 12 | public function __destruct() |
|
292 | 12 | { |
|
293 | 6 | if ($this->close_on_destruct) { |
|
294 | 12 | $this->safeClose(); |
|
295 | } |
||
296 | } |
||
297 | |||
298 | /** |
||
299 | 48 | * Attempts to close the connection safely |
|
300 | */ |
||
301 | protected function safeClose() |
||
302 | 48 | { |
|
303 | 33 | try { |
|
304 | 9 | if (isset($this->input) && $this->input) { |
|
305 | 24 | $this->close(); |
|
306 | } |
||
307 | } catch (\Exception $e) { |
||
308 | 48 | // Nothing here |
|
309 | } |
||
310 | } |
||
311 | |||
312 | /** |
||
313 | * @param int $sec |
||
314 | * @param int $usec |
||
315 | * @return mixed |
||
316 | */ |
||
317 | View Code Duplication | public function select($sec, $usec = 0) |
|
318 | { |
||
319 | try { |
||
320 | return $this->io->select($sec, $usec); |
||
321 | } catch (AMQPConnectionClosedException $e) { |
||
322 | $this->do_close(); |
||
323 | throw $e; |
||
324 | } catch (AMQPRuntimeException $e) { |
||
325 | $this->setIsConnected(false); |
||
326 | throw $e; |
||
327 | } |
||
328 | } |
||
329 | |||
330 | /** |
||
331 | * Allows to not close the connection |
||
332 | * it's useful after the fork when you don't want to close parent process connection |
||
333 | * |
||
334 | * @param bool $close |
||
335 | */ |
||
336 | public function set_close_on_destruct($close = true) |
||
337 | { |
||
338 | $this->close_on_destruct = (bool) $close; |
||
339 | 150 | } |
|
340 | |||
341 | 150 | protected function close_input() |
|
342 | { |
||
343 | 150 | $this->debug && $this->debug->debug_msg('closing input'); |
|
344 | 144 | ||
345 | 144 | if (!is_null($this->input)) { |
|
346 | 72 | $this->input->close(); |
|
347 | 150 | $this->input = null; |
|
348 | } |
||
349 | 150 | } |
|
350 | |||
351 | 150 | protected function close_socket() |
|
352 | { |
||
353 | 150 | $this->debug && $this->debug->debug_msg('closing socket'); |
|
354 | 150 | ||
355 | 75 | if ($this->io) { |
|
356 | 150 | $this->io->close(); |
|
357 | } |
||
358 | } |
||
359 | |||
360 | /** |
||
361 | 180 | * @param string $data |
|
362 | */ |
||
363 | 180 | View Code Duplication | public function write($data) |
364 | { |
||
365 | $this->debug->debug_hexdump($data); |
||
366 | 180 | ||
367 | 90 | try { |
|
368 | $this->io->write($data); |
||
369 | } catch (AMQPConnectionClosedException $e) { |
||
370 | $this->do_close(); |
||
371 | throw $e; |
||
372 | } catch (AMQPRuntimeException $e) { |
||
373 | $this->setIsConnected(false); |
||
374 | 180 | throw $e; |
|
375 | } |
||
376 | 144 | } |
|
377 | |||
378 | 144 | protected function do_close() |
|
379 | 144 | { |
|
380 | 144 | $this->setIsConnected(false); |
|
381 | 144 | $this->close_input(); |
|
382 | $this->close_socket(); |
||
383 | } |
||
384 | |||
385 | /** |
||
386 | * @return int |
||
387 | 174 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
|
388 | */ |
||
389 | 174 | public function get_free_channel_id() |
|
390 | 174 | { |
|
391 | 174 | for ($i = 1; $i <= $this->channel_max; $i++) { |
|
392 | if (!isset($this->channels[$i])) { |
||
393 | 9 | return $i; |
|
394 | } |
||
395 | } |
||
396 | |||
397 | throw new AMQPRuntimeException('No free channel ids'); |
||
398 | } |
||
399 | |||
400 | /** |
||
401 | * @param string $channel |
||
402 | * @param int $class_id |
||
403 | * @param int $weight |
||
404 | * @param int $body_size |
||
405 | * @param string $packed_properties |
||
406 | * @param string $body |
||
407 | 66 | * @param AMQPWriter $pkt |
|
408 | */ |
||
409 | 66 | public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) |
|
410 | 66 | { |
|
411 | 66 | $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt); |
|
412 | $this->write($pkt->getvalue()); |
||
413 | } |
||
414 | |||
415 | /** |
||
416 | * Returns a new AMQPWriter or mutates the provided $pkt |
||
417 | * |
||
418 | * @param string $channel |
||
419 | * @param int $class_id |
||
420 | * @param int $weight |
||
421 | * @param int $body_size |
||
422 | * @param string $packed_properties |
||
423 | * @param string $body |
||
424 | * @param AMQPWriter $pkt |
||
425 | 66 | * @return AMQPWriter |
|
426 | */ |
||
427 | 66 | public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt = null) |
|
428 | { |
||
429 | $pkt = $pkt ?: new AMQPWriter(); |
||
430 | 66 | ||
431 | 66 | // Content already prepared ? |
|
432 | 66 | $key_cache = sprintf( |
|
433 | 66 | '%s|%s|%s|%s', |
|
434 | 66 | $channel, |
|
435 | 33 | $packed_properties, |
|
436 | 33 | $class_id, |
|
437 | $weight |
||
438 | 66 | ); |
|
439 | 66 | ||
440 | 66 | if (!isset($this->prepare_content_cache[$key_cache])) { |
|
441 | 66 | $w = new AMQPWriter(); |
|
442 | 66 | $w->write_octet(2); |
|
443 | 66 | $w->write_short($channel); |
|
444 | 66 | $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12); |
|
445 | 66 | $w->write_short($class_id); |
|
446 | 66 | $w->write_short($weight); |
|
447 | $this->prepare_content_cache[$key_cache] = $w->getvalue(); |
||
448 | if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) { |
||
449 | reset($this->prepare_content_cache); |
||
450 | $old_key = key($this->prepare_content_cache); |
||
451 | 33 | unset($this->prepare_content_cache[$old_key]); |
|
452 | 66 | } |
|
453 | } |
||
454 | 66 | $pkt->write($this->prepare_content_cache[$key_cache]); |
|
455 | 66 | ||
456 | $pkt->write_longlong($body_size); |
||
457 | 66 | $pkt->write($packed_properties); |
|
458 | |||
459 | $pkt->write_octet(0xCE); |
||
460 | |||
461 | |||
462 | // memory efficiency: walk the string instead of biting |
||
463 | 66 | // it. good for very large packets (close in size to |
|
464 | 66 | // memory_limit setting) |
|
465 | 66 | $position = 0; |
|
466 | 60 | $bodyLength = mb_strlen($body,'ASCII'); |
|
467 | 60 | while ($position < $bodyLength) { |
|
468 | $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII'); |
||
469 | 60 | $position += $this->frame_max - 8; |
|
470 | 60 | ||
471 | 60 | $pkt->write_octet(3); |
|
472 | $pkt->write_short($channel); |
||
473 | 60 | $pkt->write_long(mb_strlen($payload, 'ASCII')); |
|
474 | |||
475 | 60 | $pkt->write($payload); |
|
476 | 30 | ||
477 | $pkt->write_octet(0xCE); |
||
478 | 66 | } |
|
479 | |||
480 | return $pkt; |
||
481 | } |
||
482 | |||
483 | /** |
||
484 | * @param string $channel |
||
485 | * @param array $method_sig |
||
486 | * @param AMQPWriter|string $args |
||
487 | 180 | * @param null $pkt |
|
488 | */ |
||
489 | 180 | protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) |
|
490 | 180 | { |
|
491 | 180 | $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt); |
|
492 | 180 | $this->write($pkt->getvalue()); |
|
493 | $this->debug->debug_method_signature1($method_sig); |
||
494 | } |
||
495 | |||
496 | /** |
||
497 | * Returns a new AMQPWriter or mutates the provided $pkt |
||
498 | * |
||
499 | * @param string $channel |
||
500 | * @param array $method_sig |
||
501 | * @param AMQPWriter|string $args |
||
502 | * @param AMQPWriter $pkt |
||
503 | 180 | * @return AMQPWriter |
|
504 | */ |
||
505 | 180 | protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null) |
|
506 | 180 | { |
|
507 | 90 | if ($args instanceof AMQPWriter) { |
|
508 | $args = $args->getvalue(); |
||
509 | 180 | } |
|
510 | |||
511 | 180 | $pkt = $pkt ?: new AMQPWriter(); |
|
512 | 180 | ||
513 | 180 | $pkt->write_octet(1); |
|
514 | $pkt->write_short($channel); |
||
515 | $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id |
||
516 | 180 | // in payload |
|
517 | 180 | ||
518 | 180 | $pkt->write_short($method_sig[0]); // class_id |
|
519 | $pkt->write_short($method_sig[1]); // method_id |
||
520 | 180 | $pkt->write($args); |
|
521 | |||
522 | 180 | $pkt->write_octet(0xCE); |
|
523 | |||
524 | 180 | $this->debug->debug_method_signature1($method_sig); |
|
525 | |||
526 | return $pkt; |
||
527 | } |
||
528 | |||
529 | /** |
||
530 | * Waits for a frame from the server |
||
531 | * |
||
532 | * @param int|float|null $timeout |
||
533 | * @return array |
||
534 | * @throws \Exception |
||
535 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
536 | 180 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
|
537 | */ |
||
538 | 180 | protected function wait_frame($timeout = 0) |
|
539 | 90 | { |
|
540 | if (is_null($this->input)) |
||
541 | { |
||
542 | $this->setIsConnected(false); |
||
543 | throw new AMQPConnectionClosedException('Broken pipe or closed connection'); |
||
544 | 180 | } |
|
545 | 180 | ||
546 | $currentTimeout = $this->input->getTimeout(); |
||
547 | $this->input->setTimeout($timeout); |
||
548 | |||
549 | 180 | try { |
|
550 | 180 | // frame_type + channel_id + size |
|
551 | 90 | $this->wait_frame_reader->reuse( |
|
552 | $this->input->read(AMQPReader::OCTET + AMQPReader::SHORT + AMQPReader::LONG) |
||
553 | 180 | ); |
|
554 | 180 | ||
555 | 180 | $frame_type = $this->wait_frame_reader->read_octet(); |
|
556 | $class = self::$PROTOCOL_CONSTANTS_CLASS; |
||
0 ignored issues
–
show
|
|||
557 | if (!array_key_exists($frame_type, $class::$FRAME_TYPES)) { |
||
558 | 180 | throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type); |
|
559 | 180 | } |
|
560 | $channel = $this->wait_frame_reader->read_short(); |
||
561 | $size = $this->wait_frame_reader->read_long(); |
||
562 | 180 | ||
563 | // payload + ch |
||
564 | 180 | $this->wait_frame_reader->reuse($this->input->read(AMQPReader::OCTET + (int) $size)); |
|
565 | 180 | ||
566 | $payload = $this->wait_frame_reader->read($size); |
||
567 | 129 | $ch = $this->wait_frame_reader->read_octet(); |
|
568 | 30 | ||
569 | 30 | } catch (AMQPTimeoutException $e) { |
|
570 | 48 | $this->input->setTimeout($currentTimeout); |
|
571 | 48 | throw $e; |
|
572 | 48 | } catch (AMQPNoDataException $e) { |
|
573 | 24 | if ($this->input) { |
|
574 | 48 | $this->input->setTimeout($currentTimeout); |
|
575 | } |
||
576 | throw $e; |
||
577 | } catch (AMQPConnectionClosedException $exception) { |
||
578 | $this->do_close(); |
||
579 | throw $exception; |
||
580 | 180 | } |
|
581 | |||
582 | 180 | $this->input->setTimeout($currentTimeout); |
|
583 | |||
584 | if ($ch != 0xCE) { |
||
585 | throw new AMQPInvalidFrameException(sprintf( |
||
586 | 'Framing error, unexpected byte: %x', |
||
587 | $ch |
||
588 | )); |
||
589 | 180 | } |
|
590 | |||
591 | return array($frame_type, $channel, $payload); |
||
592 | } |
||
593 | |||
594 | /** |
||
595 | * Waits for a frame from the server destined for a particular channel. |
||
596 | * |
||
597 | * @param string $channel_id |
||
598 | * @param int|float|null $timeout |
||
599 | 180 | * @return array |
|
600 | */ |
||
601 | protected function wait_channel($channel_id, $timeout = 0) |
||
602 | 180 | { |
|
603 | 180 | // Keeping the original timeout unchanged. |
|
604 | 180 | $_timeout = $timeout; |
|
605 | while (true) { |
||
606 | 180 | $start = microtime(true); |
|
607 | 129 | try { |
|
608 | 30 | list($frame_type, $frame_channel, $payload) = $this->wait_frame($_timeout); |
|
609 | } catch (AMQPTimeoutException $e) { |
||
610 | if ($this->heartbeat && $this->last_frame && microtime(true) - ($this->heartbeat * 2) > $this->last_frame) { |
||
611 | $this->debug->debug_msg("missed server heartbeat (at threshold * 2)"); |
||
612 | $this->setIsConnected(false); |
||
613 | throw new AMQPHeartbeatMissedException("Missed server heartbeat"); |
||
614 | 30 | } |
|
615 | |||
616 | throw $e; |
||
617 | 180 | } |
|
618 | |||
619 | 180 | $this->last_frame = microtime(true); |
|
620 | |||
621 | if ($frame_channel === 0 && $frame_type === 8) { |
||
622 | // skip heartbeat frames and reduce the timeout by the time passed |
||
623 | $this->debug->debug_msg("received server heartbeat"); |
||
624 | if($_timeout > 0) { |
||
625 | $_timeout -= $this->last_frame - $start; |
||
626 | if($_timeout <= 0) { |
||
627 | // If timeout has been reached, throw the exception without calling wait_frame |
||
628 | throw new AMQPTimeoutException("Timeout waiting on channel"); |
||
629 | } |
||
630 | } |
||
631 | continue; |
||
632 | |||
633 | 180 | } |
|
634 | 180 | ||
635 | if ($frame_channel == $channel_id) { |
||
636 | return array($frame_type, $payload); |
||
637 | } |
||
638 | |||
639 | // Not the channel we were looking for. Queue this frame |
||
640 | //for later, when the other channel is looking for frames. |
||
641 | 6 | // Make sure the channel still exists, it could have been |
|
642 | 6 | // closed by a previous Exception. |
|
643 | 3 | if (isset($this->channels[$frame_channel])) { |
|
644 | array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload)); |
||
645 | } |
||
646 | |||
647 | // If we just queued up a method for channel 0 (the Connection |
||
648 | 6 | // itself) it's probably a close method in reaction to some |
|
649 | // error, so deal with it right away. |
||
650 | if ($frame_type == 1 && $frame_channel == 0) { |
||
651 | $this->wait(); |
||
652 | 3 | } |
|
653 | } |
||
654 | } |
||
655 | |||
656 | /** |
||
657 | * Fetches a channel object identified by the numeric channel_id, or |
||
658 | * create that object if it doesn't already exist. |
||
659 | * |
||
660 | * @param int $channel_id |
||
661 | * @return AMQPChannel |
||
662 | 174 | */ |
|
663 | public function channel($channel_id = null) |
||
664 | 174 | { |
|
665 | if (isset($this->channels[$channel_id])) { |
||
666 | return $this->channels[$channel_id]; |
||
667 | } |
||
668 | 174 | ||
669 | 174 | $channel_id = $channel_id ? $channel_id : $this->get_free_channel_id(); |
|
670 | 174 | $ch = new AMQPChannel($this->connection, $channel_id, true, $this->channel_rpc_timeout); |
|
671 | $this->channels[$channel_id] = $ch; |
||
672 | 174 | ||
673 | return $ch; |
||
674 | } |
||
675 | |||
676 | /** |
||
677 | * Requests a connection close |
||
678 | * |
||
679 | * @param int $reply_code |
||
680 | * @param string $reply_text |
||
681 | * @param array $method_sig |
||
682 | * @return mixed|null |
||
683 | 144 | */ |
|
684 | public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0)) |
||
685 | 144 | { |
|
686 | 144 | $result = null; |
|
687 | 144 | $this->io->disableHeartbeat(); |
|
688 | 18 | if (empty($this->protocolWriter) || !$this->isConnected()) { |
|
689 | return $result; |
||
690 | } |
||
691 | |||
692 | 144 | try { |
|
693 | 144 | $this->closeChannels(); |
|
694 | 144 | list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose( |
|
695 | 144 | $reply_code, |
|
696 | 144 | $reply_text, |
|
697 | 144 | $method_sig[0], |
|
698 | 72 | $method_sig[1] |
|
699 | 144 | ); |
|
700 | 144 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
701 | 144 | $result = $this->wait( |
|
702 | 144 | array($this->waitHelper->get_wait('connection.close_ok')), |
|
703 | 144 | false, |
|
704 | 72 | $this->connection_timeout |
|
705 | 72 | ); |
|
706 | } catch (\Exception $exception) { |
||
707 | $this->do_close(); |
||
708 | throw $exception; |
||
709 | } |
||
710 | 144 | ||
711 | $this->setIsConnected(false); |
||
712 | 144 | ||
713 | return $result; |
||
714 | } |
||
715 | |||
716 | /** |
||
717 | * @param AMQPReader $reader |
||
718 | * @throws \PhpAmqpLib\Exception\AMQPProtocolConnectionException |
||
719 | */ |
||
720 | protected function connection_close(AMQPReader $reader) |
||
721 | { |
||
722 | $reply_code = $reader->read_short(); |
||
723 | $reply_text = $reader->read_shortstr(); |
||
724 | $class_id = $reader->read_short(); |
||
725 | $method_id = $reader->read_short(); |
||
726 | |||
727 | $this->x_close_ok(); |
||
728 | |||
729 | throw new AMQPProtocolConnectionException($reply_code, $reply_text, array($class_id, $method_id)); |
||
730 | } |
||
731 | |||
732 | /** |
||
733 | * Confirms a connection close |
||
734 | */ |
||
735 | protected function x_close_ok() |
||
736 | { |
||
737 | $this->send_method_frame( |
||
738 | explode(',', $this->waitHelper->get_wait('connection.close_ok')) |
||
739 | ); |
||
740 | $this->do_close(); |
||
741 | } |
||
742 | |||
743 | /** |
||
744 | * Confirm a connection close |
||
745 | * |
||
746 | * @param AMQPReader $args |
||
747 | 144 | */ |
|
748 | protected function connection_close_ok($args) |
||
749 | 144 | { |
|
750 | 144 | $this->do_close(); |
|
751 | } |
||
752 | |||
753 | /** |
||
754 | * @param string $virtual_host |
||
755 | * @param string $capabilities |
||
756 | * @param bool $insist |
||
757 | * @return mixed |
||
758 | 180 | */ |
|
759 | protected function x_open($virtual_host, $capabilities = '', $insist = false) |
||
760 | 180 | { |
|
761 | 180 | $args = new AMQPWriter(); |
|
762 | 180 | $args->write_shortstr($virtual_host); |
|
763 | 180 | $args->write_shortstr($capabilities); |
|
764 | 180 | $args->write_bits(array($insist)); |
|
765 | $this->send_method_frame(array(10, 40), $args); |
||
766 | |||
767 | 180 | $wait = array( |
|
768 | 90 | $this->waitHelper->get_wait('connection.open_ok') |
|
769 | ); |
||
770 | 180 | ||
771 | if ($this->protocolVersion == '0.8') { |
||
772 | $wait[] = $this->waitHelper->get_wait('connection.redirect'); |
||
773 | } |
||
774 | 180 | ||
775 | return $this->wait($wait); |
||
776 | } |
||
777 | |||
778 | /** |
||
779 | * Signals that the connection is ready |
||
780 | * |
||
781 | * @param AMQPReader $args |
||
782 | 180 | */ |
|
783 | protected function connection_open_ok($args) |
||
784 | 180 | { |
|
785 | 180 | $this->known_hosts = $args->read_shortstr(); |
|
786 | 180 | $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts); |
|
787 | } |
||
788 | |||
789 | /** |
||
790 | * Asks the client to use a different server |
||
791 | * |
||
792 | * @param AMQPReader $args |
||
793 | * @return string |
||
794 | */ |
||
795 | protected function connection_redirect($args) |
||
796 | { |
||
797 | $host = $args->read_shortstr(); |
||
798 | $this->known_hosts = $args->read_shortstr(); |
||
799 | $this->debug->debug_msg(sprintf( |
||
800 | 'Redirected to [%s], known_hosts [%s]', |
||
801 | $host, |
||
802 | $this->known_hosts |
||
803 | )); |
||
804 | |||
805 | return $host; |
||
806 | } |
||
807 | |||
808 | /** |
||
809 | * Security mechanism challenge |
||
810 | * |
||
811 | * @param AMQPReader $args |
||
812 | */ |
||
813 | protected function connection_secure($args) |
||
814 | { |
||
815 | $challenge = $args->read_longstr(); |
||
816 | } |
||
817 | |||
818 | /** |
||
819 | * Security mechanism response |
||
820 | * |
||
821 | * @param string $response |
||
822 | */ |
||
823 | protected function x_secure_ok($response) |
||
824 | { |
||
825 | $args = new AMQPWriter(); |
||
826 | $args->write_longstr($response); |
||
827 | $this->send_method_frame(array(10, 21), $args); |
||
828 | } |
||
829 | |||
830 | /** |
||
831 | * Starts connection negotiation |
||
832 | * |
||
833 | * @param AMQPReader $args |
||
834 | 180 | */ |
|
835 | protected function connection_start($args) |
||
836 | 180 | { |
|
837 | 180 | $this->version_major = $args->read_octet(); |
|
838 | 180 | $this->version_minor = $args->read_octet(); |
|
839 | 180 | $this->server_properties = $args->read_table(); |
|
840 | 180 | $this->mechanisms = explode(' ', $args->read_longstr()); |
|
841 | $this->locales = explode(' ', $args->read_longstr()); |
||
842 | 180 | ||
843 | 180 | $this->debug->debug_connection_start( |
|
844 | 180 | $this->version_major, |
|
845 | 180 | $this->version_minor, |
|
846 | 180 | $this->server_properties, |
|
847 | 180 | $this->mechanisms, |
|
848 | 90 | $this->locales |
|
849 | 180 | ); |
|
850 | } |
||
851 | |||
852 | /** |
||
853 | * @param AMQPTable|array $clientProperties |
||
854 | * @param string $mechanism |
||
855 | * @param string $response |
||
856 | * @param string $locale |
||
857 | 180 | */ |
|
858 | protected function x_start_ok($clientProperties, $mechanism, $response, $locale) |
||
859 | 180 | { |
|
860 | 180 | $args = new AMQPWriter(); |
|
861 | 180 | $args->write_table($clientProperties); |
|
862 | 180 | $args->write_shortstr($mechanism); |
|
863 | 180 | $args->write_longstr($response); |
|
864 | 180 | $args->write_shortstr($locale); |
|
865 | 180 | $this->send_method_frame(array(10, 11), $args); |
|
866 | } |
||
867 | |||
868 | /** |
||
869 | * Proposes connection tuning parameters |
||
870 | * |
||
871 | * @param AMQPReader $args |
||
872 | 180 | */ |
|
873 | protected function connection_tune($args) |
||
874 | 180 | { |
|
875 | 180 | $v = $args->read_short(); |
|
876 | if ($v) { |
||
877 | $this->channel_max = $v; |
||
878 | } |
||
879 | 180 | ||
880 | 180 | $v = $args->read_long(); |
|
881 | 180 | if ($v) { |
|
882 | 90 | $this->frame_max = $v; |
|
883 | } |
||
884 | |||
885 | 180 | // use server proposed value if not set |
|
886 | if ($this->heartbeat === null) { |
||
887 | $this->heartbeat = $args->read_short(); |
||
888 | } |
||
889 | 180 | ||
890 | 180 | $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat); |
|
891 | } |
||
892 | |||
893 | /** |
||
894 | * Negotiates connection tuning parameters |
||
895 | * |
||
896 | * @param int $channel_max |
||
897 | * @param int $frame_max |
||
898 | * @param int $heartbeat |
||
899 | 180 | */ |
|
900 | protected function x_tune_ok($channel_max, $frame_max, $heartbeat) |
||
901 | 180 | { |
|
902 | 180 | $args = new AMQPWriter(); |
|
903 | 180 | $args->write_short($channel_max); |
|
904 | 180 | $args->write_long($frame_max); |
|
905 | 180 | $args->write_short($heartbeat); |
|
906 | 180 | $this->send_method_frame(array(10, 31), $args); |
|
907 | 180 | $this->wait_tune_ok = false; |
|
908 | } |
||
909 | |||
910 | /** |
||
911 | * @return resource |
||
912 | * @deprecated No direct access to communication socket should be available. |
||
913 | */ |
||
914 | public function getSocket() |
||
915 | { |
||
916 | return $this->io->getSocket(); |
||
917 | } |
||
918 | |||
919 | /** |
||
920 | * @return \PhpAmqpLib\Wire\IO\AbstractIO |
||
921 | * @deprecated |
||
922 | */ |
||
923 | public function getIO() |
||
924 | { |
||
925 | return $this->io; |
||
926 | } |
||
927 | |||
928 | /** |
||
929 | * Check connection heartbeat if enabled. |
||
930 | * @throws AMQPHeartbeatMissedException If too much time passed since last connection activity. |
||
931 | * @throws AMQPConnectionClosedException If connection was closed due to network issues or timeouts. |
||
932 | * @throws AMQPSocketException If connection was already closed. |
||
933 | * @throws AMQPTimeoutException If heartbeat write takes too much time. |
||
934 | * @throws AMQPIOException If other connection problems occurred. |
||
935 | */ |
||
936 | public function checkHeartBeat() |
||
937 | { |
||
938 | $this->io->check_heartbeat(); |
||
939 | } |
||
940 | |||
941 | /** |
||
942 | * Handles connection blocked notifications |
||
943 | * |
||
944 | * @param AMQPReader $args |
||
945 | */ |
||
946 | protected function connection_blocked(AMQPReader $args) |
||
947 | { |
||
948 | // Call the block handler and pass in the reason |
||
949 | $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr())); |
||
950 | } |
||
951 | |||
952 | /** |
||
953 | * Handles connection unblocked notifications |
||
954 | * |
||
955 | * @param AMQPReader $args |
||
956 | */ |
||
957 | protected function connection_unblocked(AMQPReader $args) |
||
958 | { |
||
959 | // No args to an unblock event |
||
960 | $this->dispatch_to_handler($this->connection_unblock_handler, array()); |
||
961 | } |
||
962 | |||
963 | /** |
||
964 | * Sets a handler which is called whenever a connection.block is sent from the server |
||
965 | * |
||
966 | * @param callable $callback |
||
967 | */ |
||
968 | public function set_connection_block_handler($callback) |
||
969 | { |
||
970 | $this->connection_block_handler = $callback; |
||
971 | } |
||
972 | |||
973 | /** |
||
974 | * Sets a handler which is called whenever a connection.block is sent from the server |
||
975 | * |
||
976 | * @param callable $callback |
||
977 | */ |
||
978 | public function set_connection_unblock_handler($callback) |
||
979 | { |
||
980 | $this->connection_unblock_handler = $callback; |
||
981 | } |
||
982 | |||
983 | /** |
||
984 | * Gets the connection status |
||
985 | * |
||
986 | * @return bool |
||
987 | 186 | */ |
|
988 | public function isConnected() |
||
989 | 186 | { |
|
990 | return (bool) $this->is_connected; |
||
991 | } |
||
992 | |||
993 | /** |
||
994 | * Set the connection status |
||
995 | * |
||
996 | * @param bool $is_connected |
||
997 | 186 | */ |
|
998 | protected function setIsConnected($is_connected) |
||
999 | 186 | { |
|
1000 | 186 | $this->is_connected = (bool) $is_connected; |
|
1001 | } |
||
1002 | |||
1003 | /** |
||
1004 | * Closes all available channels |
||
1005 | 150 | */ |
|
1006 | protected function closeChannels() |
||
1007 | 150 | { |
|
1008 | foreach ($this->channels as $key => $channel) { |
||
1009 | 144 | // channels[0] is this connection object, so don't close it yet |
|
1010 | 144 | if ($key === 0) { |
|
1011 | continue; |
||
1012 | } |
||
1013 | 42 | try { |
|
1014 | 35 | $channel->close(); |
|
1015 | } catch (\Exception $e) { |
||
1016 | /* Ignore closing errors */ |
||
1017 | 75 | } |
|
1018 | 150 | } |
|
1019 | } |
||
1020 | |||
1021 | /** |
||
1022 | * Should the connection be attempted during construction? |
||
1023 | * |
||
1024 | * @return bool |
||
1025 | 162 | */ |
|
1026 | public function connectOnConstruct() |
||
1027 | 162 | { |
|
1028 | return true; |
||
1029 | } |
||
1030 | |||
1031 | /** |
||
1032 | * @return array |
||
1033 | */ |
||
1034 | public function getServerProperties() |
||
1035 | { |
||
1036 | return $this->server_properties; |
||
1037 | } |
||
1038 | |||
1039 | /** |
||
1040 | * Get the library properties for populating the client protocol information |
||
1041 | * |
||
1042 | * @return array |
||
1043 | 192 | */ |
|
1044 | public function getLibraryProperties() |
||
1045 | 192 | { |
|
1046 | return self::$LIBRARY_PROPERTIES; |
||
1047 | } |
||
1048 | 12 | ||
1049 | 12 | public static function create_connection($hosts, $options = array()){ |
|
1050 | 12 | $latest_exception = null; |
|
1051 | 12 | foreach ($hosts as $hostdef) { |
|
1052 | 12 | AbstractConnection::validate_host($hostdef); |
|
1053 | 12 | $host = $hostdef['host']; |
|
1054 | 12 | $port = $hostdef['port']; |
|
1055 | 12 | $user = $hostdef['user']; |
|
1056 | 12 | $password = $hostdef['password']; |
|
1057 | $vhost = isset($hostdef['vhost']) ? $hostdef['vhost'] : "/"; |
||
1058 | 12 | try { |
|
1059 | 12 | $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options); |
|
1060 | return $conn; |
||
1061 | } catch (\Exception $e) { |
||
1062 | $latest_exception = $e; |
||
1063 | } |
||
1064 | } |
||
1065 | throw $latest_exception; |
||
1066 | } |
||
1067 | |||
1068 | public static function validate_host($host) { |
||
1069 | if(!isset($host['host'])){ |
||
1070 | throw new \InvalidArgumentException("'host' key is required."); |
||
1071 | } |
||
1072 | if(!isset($host['port'])){ |
||
1073 | throw new \InvalidArgumentException("'port' key is required."); |
||
1074 | } |
||
1075 | if(!isset($host['user'])){ |
||
1076 | throw new \InvalidArgumentException("'user' key is required."); |
||
1077 | } |
||
1078 | if(!isset($host['password'])){ |
||
1079 | throw new \InvalidArgumentException("'password' key is required."); |
||
1080 | } |
||
1081 | } |
||
1082 | } |
||
1083 |
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.