These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace PhpAmqpLib\Channel; |
||
4 | |||
5 | use PhpAmqpLib\Connection\AbstractConnection; |
||
6 | use PhpAmqpLib\Exception\AMQPBasicCancelException; |
||
7 | use PhpAmqpLib\Exception\AMQPChannelClosedException; |
||
8 | use PhpAmqpLib\Exception\AMQPConnectionBlockedException; |
||
9 | use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
||
10 | use PhpAmqpLib\Exception\AMQPProtocolChannelException; |
||
11 | use PhpAmqpLib\Exception\AMQPRuntimeException; |
||
12 | use PhpAmqpLib\Helper\Assert; |
||
13 | use PhpAmqpLib\Message\AMQPMessage; |
||
14 | use PhpAmqpLib\Wire; |
||
15 | use PhpAmqpLib\Wire\AMQPReader; |
||
16 | use PhpAmqpLib\Wire\AMQPTable; |
||
17 | use PhpAmqpLib\Wire\AMQPWriter; |
||
18 | |||
19 | class AMQPChannel extends AbstractChannel |
||
20 | { |
||
21 | /** |
||
22 | * @var callable[] |
||
23 | * @internal Use is_consuming() to check if there is active callbacks |
||
24 | */ |
||
25 | public $callbacks = array(); |
||
26 | |||
27 | /** @var bool Whether or not the channel has been "opened" */ |
||
28 | protected $is_open = false; |
||
29 | |||
30 | /** @var int */ |
||
31 | protected $default_ticket = 0; |
||
32 | |||
33 | /** @var bool */ |
||
34 | protected $active = true; |
||
35 | |||
36 | /** @var array */ |
||
37 | protected $alerts = array(); |
||
38 | |||
39 | /** @var bool */ |
||
40 | protected $auto_decode; |
||
41 | |||
42 | /** |
||
43 | * These parameters will be passed to function in case of basic_return: |
||
44 | * param int $reply_code |
||
45 | * param string $reply_text |
||
46 | * param string $exchange |
||
47 | * param string $routing_key |
||
48 | * param AMQPMessage $msg |
||
49 | * |
||
50 | * @var null|callable |
||
51 | */ |
||
52 | protected $basic_return_callback; |
||
53 | |||
54 | /** @var array Used to keep track of the messages that are going to be batch published. */ |
||
55 | protected $batch_messages = array(); |
||
56 | |||
57 | /** |
||
58 | * If the channel is in confirm_publish mode this array will store all published messages |
||
59 | * until they get ack'ed or nack'ed |
||
60 | * |
||
61 | * @var AMQPMessage[] |
||
62 | */ |
||
63 | private $published_messages = array(); |
||
64 | |||
65 | /** @var int */ |
||
66 | private $next_delivery_tag = 0; |
||
67 | |||
68 | /** @var null|callable */ |
||
69 | private $ack_handler; |
||
70 | |||
71 | /** @var null|callable */ |
||
72 | private $nack_handler; |
||
73 | |||
74 | /** |
||
75 | * Circular buffer to speed up both basic_publish() and publish_batch(). |
||
76 | * Max size limited by $publish_cache_max_size. |
||
77 | * |
||
78 | * @var array |
||
79 | * @see basic_publish() |
||
80 | * @see publish_batch() |
||
81 | */ |
||
82 | private $publish_cache = array(); |
||
83 | |||
84 | /** |
||
85 | * Maximal size of $publish_cache |
||
86 | * |
||
87 | * @var int |
||
88 | */ |
||
89 | private $publish_cache_max_size = 100; |
||
90 | |||
91 | /** |
||
92 | * Maximum time to wait for operations on this channel, in seconds. |
||
93 | * @var float |
||
94 | */ |
||
95 | protected $channel_rpc_timeout; |
||
96 | |||
97 | /** |
||
98 | * @param AbstractConnection $connection |
||
99 | * @param int|null $channel_id |
||
100 | * @param bool $auto_decode |
||
101 | * @param int|float $channel_rpc_timeout |
||
102 | * @throws \Exception |
||
103 | */ |
||
104 | 38 | public function __construct($connection, $channel_id = null, $auto_decode = true, $channel_rpc_timeout = 0) |
|
105 | { |
||
106 | 38 | if ($channel_id == null) { |
|
0 ignored issues
–
show
Bug
Best Practice
introduced
by
Loading history...
|
|||
107 | $channel_id = $connection->get_free_channel_id(); |
||
108 | } |
||
109 | |||
110 | 38 | parent::__construct($connection, $channel_id); |
|
111 | |||
112 | 38 | $this->debug->debug_msg('using channel_id: ' . $channel_id); |
|
113 | |||
114 | 38 | $this->auto_decode = $auto_decode; |
|
115 | 38 | $this->channel_rpc_timeout = $channel_rpc_timeout; |
|
0 ignored issues
–
show
The property
$channel_rpc_timeout was declared of type double , but $channel_rpc_timeout is of type integer . Maybe add a type cast?
This check looks for assignments to scalar types that may be of the wrong type. To ensure the code behaves as expected, it may be a good idea to add an explicit type cast. $answer = 42;
$correct = false;
$correct = (bool) $answer;
Loading history...
|
|||
116 | |||
117 | try { |
||
118 | 38 | $this->x_open(); |
|
119 | } catch (\Exception $e) { |
||
120 | $this->close(); |
||
121 | throw $e; |
||
122 | } |
||
123 | 38 | } |
|
124 | |||
125 | /** |
||
126 | * @return bool |
||
127 | */ |
||
128 | 4 | public function is_open() |
|
129 | { |
||
130 | 4 | return $this->is_open; |
|
131 | } |
||
132 | |||
133 | /** |
||
134 | * Tear down this object, after we've agreed to close with the server. |
||
135 | */ |
||
136 | 33 | protected function do_close() |
|
137 | { |
||
138 | 33 | if ($this->channel_id !== null) { |
|
139 | 33 | unset($this->connection->channels[$this->channel_id]); |
|
140 | } |
||
141 | 33 | $this->channel_id = $this->connection = null; |
|
142 | 33 | $this->is_open = false; |
|
143 | 33 | $this->callbacks = array(); |
|
144 | 33 | } |
|
145 | |||
146 | /** |
||
147 | * Only for AMQP0.8.0 |
||
148 | * This method allows the server to send a non-fatal warning to |
||
149 | * the client. This is used for methods that are normally |
||
150 | * asynchronous and thus do not have confirmations, and for which |
||
151 | * the server may detect errors that need to be reported. Fatal |
||
152 | * errors are handled as channel or connection exceptions; non- |
||
153 | * fatal errors are sent through this method. |
||
154 | * |
||
155 | * @param AMQPReader $reader |
||
156 | */ |
||
157 | protected function channel_alert($reader) |
||
158 | { |
||
159 | $reply_code = $reader->read_short(); |
||
160 | $reply_text = $reader->read_shortstr(); |
||
161 | $details = $reader->read_table(); |
||
162 | array_push($this->alerts, array($reply_code, $reply_text, $details)); |
||
163 | } |
||
164 | |||
165 | /** |
||
166 | * Request a channel close |
||
167 | * |
||
168 | * @param int $reply_code |
||
169 | * @param string $reply_text |
||
170 | * @param array $method_sig |
||
171 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
172 | * @return mixed |
||
173 | */ |
||
174 | 33 | public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0)) |
|
175 | { |
||
176 | 33 | $this->callbacks = array(); |
|
177 | 33 | if ($this->is_open === false || $this->connection === null) { |
|
178 | 7 | $this->do_close(); |
|
179 | |||
180 | 7 | return null; // already closed |
|
181 | } |
||
182 | 33 | list($class_id, $method_id, $args) = $this->protocolWriter->channelClose( |
|
183 | 33 | $reply_code, |
|
184 | $reply_text, |
||
185 | 33 | $method_sig[0], |
|
186 | 33 | $method_sig[1] |
|
187 | ); |
||
188 | |||
189 | try { |
||
190 | 33 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
191 | } catch (\Exception $e) { |
||
192 | $this->do_close(); |
||
193 | |||
194 | throw $e; |
||
195 | } |
||
196 | |||
197 | 33 | return $this->wait(array( |
|
198 | 33 | $this->waitHelper->get_wait('channel.close_ok') |
|
199 | 33 | ), false, $this->channel_rpc_timeout); |
|
200 | } |
||
201 | |||
202 | /** |
||
203 | * @param AMQPReader $reader |
||
204 | * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException |
||
205 | */ |
||
206 | 1 | protected function channel_close($reader) |
|
207 | { |
||
208 | 1 | $reply_code = $reader->read_short(); |
|
209 | 1 | $reply_text = $reader->read_shortstr(); |
|
210 | 1 | $class_id = $reader->read_short(); |
|
211 | 1 | $method_id = $reader->read_short(); |
|
212 | |||
213 | 1 | $this->send_method_frame(array(20, 41)); |
|
214 | 1 | $this->do_close(); |
|
215 | |||
216 | 1 | throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id)); |
|
217 | } |
||
218 | |||
219 | /** |
||
220 | * Confirm a channel close |
||
221 | * Alias of AMQPChannel::do_close() |
||
222 | */ |
||
223 | 33 | protected function channel_close_ok() |
|
224 | { |
||
225 | 33 | $this->do_close(); |
|
226 | 33 | } |
|
227 | |||
228 | /** |
||
229 | * Enables/disables flow from peer |
||
230 | * |
||
231 | * @param bool $active |
||
232 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
233 | * @return mixed |
||
234 | */ |
||
235 | public function flow($active) |
||
236 | { |
||
237 | list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active); |
||
238 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
239 | |||
240 | return $this->wait(array( |
||
241 | $this->waitHelper->get_wait('channel.flow_ok') |
||
242 | ), false, $this->channel_rpc_timeout); |
||
243 | } |
||
244 | |||
245 | /** |
||
246 | * @param AMQPReader $reader |
||
247 | */ |
||
248 | protected function channel_flow($reader) |
||
249 | { |
||
250 | $this->active = $reader->read_bit(); |
||
251 | $this->x_flow_ok($this->active); |
||
252 | } |
||
253 | |||
254 | /** |
||
255 | * @param bool $active |
||
256 | */ |
||
257 | protected function x_flow_ok($active) |
||
258 | { |
||
259 | list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active); |
||
260 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
261 | } |
||
262 | |||
263 | /** |
||
264 | * @param AMQPReader $reader |
||
265 | * @return bool |
||
266 | */ |
||
267 | protected function channel_flow_ok($reader) |
||
268 | { |
||
269 | return $reader->read_bit(); |
||
270 | } |
||
271 | |||
272 | /** |
||
273 | * @param string $out_of_band |
||
274 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
275 | * @return mixed |
||
276 | */ |
||
277 | 38 | protected function x_open($out_of_band = '') |
|
278 | { |
||
279 | 38 | if ($this->is_open) { |
|
280 | return null; |
||
281 | } |
||
282 | |||
283 | 38 | list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band); |
|
284 | 38 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
285 | |||
286 | 38 | return $this->wait(array( |
|
287 | 38 | $this->waitHelper->get_wait('channel.open_ok') |
|
288 | 38 | ), false, $this->channel_rpc_timeout); |
|
289 | } |
||
290 | |||
291 | 38 | protected function channel_open_ok() |
|
292 | { |
||
293 | 38 | $this->is_open = true; |
|
294 | |||
295 | 38 | $this->debug->debug_msg('Channel open'); |
|
296 | 38 | } |
|
297 | |||
298 | /** |
||
299 | * Requests an access ticket |
||
300 | * |
||
301 | * @param string $realm |
||
302 | * @param bool $exclusive |
||
303 | * @param bool $passive |
||
304 | * @param bool $active |
||
305 | * @param bool $write |
||
306 | * @param bool $read |
||
307 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
308 | * @return mixed |
||
309 | */ |
||
310 | public function access_request( |
||
311 | $realm, |
||
312 | $exclusive = false, |
||
313 | $passive = false, |
||
314 | $active = false, |
||
315 | $write = false, |
||
316 | $read = false |
||
317 | ) { |
||
318 | list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest( |
||
319 | $realm, |
||
320 | $exclusive, |
||
321 | $passive, |
||
322 | $active, |
||
323 | $write, |
||
324 | $read |
||
325 | ); |
||
326 | |||
327 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
328 | |||
329 | return $this->wait(array( |
||
330 | $this->waitHelper->get_wait('access.request_ok') |
||
331 | ), false, $this->channel_rpc_timeout); |
||
332 | } |
||
333 | |||
334 | /** |
||
335 | * Grants access to server resources |
||
336 | * |
||
337 | * @param AMQPReader $reader |
||
338 | * @return string |
||
339 | */ |
||
340 | protected function access_request_ok($reader) |
||
341 | { |
||
342 | $this->default_ticket = $reader->read_short(); |
||
343 | |||
344 | return $this->default_ticket; |
||
345 | } |
||
346 | |||
347 | /** |
||
348 | * Declares exchange |
||
349 | * |
||
350 | * @param string $exchange |
||
351 | * @param string $type |
||
352 | * @param bool $passive |
||
353 | * @param bool $durable |
||
354 | * @param bool $auto_delete |
||
355 | * @param bool $internal |
||
356 | * @param bool $nowait |
||
357 | * @param AMQPTable|array $arguments |
||
358 | * @param int|null $ticket |
||
359 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
360 | * @return mixed|null |
||
361 | */ |
||
362 | 31 | View Code Duplication | public function exchange_declare( |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
363 | $exchange, |
||
364 | $type, |
||
365 | $passive = false, |
||
366 | $durable = false, |
||
367 | $auto_delete = true, |
||
368 | $internal = false, |
||
369 | $nowait = false, |
||
370 | $arguments = array(), |
||
371 | $ticket = null |
||
372 | ) { |
||
373 | 31 | $ticket = $this->getTicket($ticket); |
|
374 | |||
375 | 31 | list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare( |
|
376 | 31 | $ticket, |
|
377 | $exchange, |
||
378 | $type, |
||
379 | $passive, |
||
380 | $durable, |
||
381 | $auto_delete, |
||
382 | $internal, |
||
383 | $nowait, |
||
384 | $arguments |
||
385 | ); |
||
386 | |||
387 | 31 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
388 | |||
389 | 27 | if ($nowait) { |
|
390 | return null; |
||
391 | } |
||
392 | |||
393 | 27 | return $this->wait(array( |
|
394 | 27 | $this->waitHelper->get_wait('exchange.declare_ok') |
|
395 | 27 | ), false, $this->channel_rpc_timeout); |
|
396 | } |
||
397 | |||
398 | /** |
||
399 | * Confirms an exchange declaration |
||
400 | */ |
||
401 | 26 | protected function exchange_declare_ok() |
|
402 | { |
||
403 | 26 | } |
|
404 | |||
405 | /** |
||
406 | * Deletes an exchange |
||
407 | * |
||
408 | * @param string $exchange |
||
409 | * @param bool $if_unused |
||
410 | * @param bool $nowait |
||
411 | * @param int|null $ticket |
||
412 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
413 | * @return mixed|null |
||
414 | */ |
||
415 | 11 | View Code Duplication | public function exchange_delete( |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
416 | $exchange, |
||
417 | $if_unused = false, |
||
418 | $nowait = false, |
||
419 | $ticket = null |
||
420 | ) { |
||
421 | 11 | $ticket = $this->getTicket($ticket); |
|
422 | 11 | list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete( |
|
423 | 11 | $ticket, |
|
424 | $exchange, |
||
425 | $if_unused, |
||
426 | $nowait |
||
427 | ); |
||
428 | |||
429 | 11 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
430 | |||
431 | 11 | if ($nowait) { |
|
432 | return null; |
||
433 | } |
||
434 | |||
435 | 11 | return $this->wait(array( |
|
436 | 11 | $this->waitHelper->get_wait('exchange.delete_ok') |
|
437 | 11 | ), false, $this->channel_rpc_timeout); |
|
438 | } |
||
439 | |||
440 | /** |
||
441 | * Confirms deletion of an exchange |
||
442 | */ |
||
443 | 11 | protected function exchange_delete_ok() |
|
444 | { |
||
445 | 11 | } |
|
446 | |||
447 | /** |
||
448 | * Binds dest exchange to source exchange |
||
449 | * |
||
450 | * @param string $destination |
||
451 | * @param string $source |
||
452 | * @param string $routing_key |
||
453 | * @param bool $nowait |
||
454 | * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments |
||
455 | * @param int|null $ticket |
||
456 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
457 | * @return mixed|null |
||
458 | */ |
||
459 | public function exchange_bind( |
||
460 | $destination, |
||
461 | $source, |
||
462 | $routing_key = '', |
||
463 | $nowait = false, |
||
464 | $arguments = array(), |
||
465 | $ticket = null |
||
466 | ) { |
||
467 | $ticket = $this->getTicket($ticket); |
||
468 | |||
469 | list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind( |
||
470 | $ticket, |
||
471 | $destination, |
||
472 | $source, |
||
473 | $routing_key, |
||
474 | $nowait, |
||
475 | $arguments |
||
476 | ); |
||
477 | |||
478 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
479 | |||
480 | if ($nowait) { |
||
481 | return null; |
||
482 | } |
||
483 | |||
484 | return $this->wait(array( |
||
485 | $this->waitHelper->get_wait('exchange.bind_ok') |
||
486 | ), false, $this->channel_rpc_timeout); |
||
487 | } |
||
488 | |||
489 | /** |
||
490 | * Confirms bind successful |
||
491 | */ |
||
492 | protected function exchange_bind_ok() |
||
493 | { |
||
494 | } |
||
495 | |||
496 | /** |
||
497 | * Unbinds dest exchange from source exchange |
||
498 | * |
||
499 | * @param string $destination |
||
500 | * @param string $source |
||
501 | * @param string $routing_key |
||
502 | * @param bool $nowait |
||
503 | * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments |
||
504 | * @param int|null $ticket |
||
505 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
506 | * @return mixed |
||
507 | */ |
||
508 | View Code Duplication | public function exchange_unbind( |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
509 | $destination, |
||
510 | $source, |
||
511 | $routing_key = '', |
||
512 | $nowait = false, |
||
513 | $arguments = array(), |
||
514 | $ticket = null |
||
515 | ) { |
||
516 | $ticket = $this->getTicket($ticket); |
||
517 | |||
518 | list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind( |
||
519 | $ticket, |
||
520 | $destination, |
||
521 | $source, |
||
522 | $routing_key, |
||
523 | $nowait, |
||
524 | $arguments |
||
525 | ); |
||
526 | |||
527 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
528 | |||
529 | return $this->wait(array( |
||
530 | $this->waitHelper->get_wait('exchange.unbind_ok') |
||
531 | ), false, $this->channel_rpc_timeout); |
||
532 | } |
||
533 | |||
534 | /** |
||
535 | * Confirms unbind successful |
||
536 | */ |
||
537 | protected function exchange_unbind_ok() |
||
538 | { |
||
539 | } |
||
540 | |||
541 | /** |
||
542 | * Binds queue to an exchange |
||
543 | * |
||
544 | * @param string $queue |
||
545 | * @param string $exchange |
||
546 | * @param string $routing_key |
||
547 | * @param bool $nowait |
||
548 | * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments |
||
549 | * @param int|null $ticket |
||
550 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
551 | * @return mixed|null |
||
552 | */ |
||
553 | 27 | public function queue_bind( |
|
554 | $queue, |
||
555 | $exchange, |
||
556 | $routing_key = '', |
||
557 | $nowait = false, |
||
558 | $arguments = array(), |
||
559 | $ticket = null |
||
560 | ) { |
||
561 | 27 | $ticket = $this->getTicket($ticket); |
|
562 | |||
563 | 27 | list($class_id, $method_id, $args) = $this->protocolWriter->queueBind( |
|
564 | 27 | $ticket, |
|
565 | $queue, |
||
566 | $exchange, |
||
567 | $routing_key, |
||
568 | $nowait, |
||
569 | $arguments |
||
570 | ); |
||
571 | |||
572 | 27 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
573 | |||
574 | 27 | if ($nowait) { |
|
575 | return null; |
||
576 | } |
||
577 | |||
578 | 27 | return $this->wait(array( |
|
579 | 27 | $this->waitHelper->get_wait('queue.bind_ok') |
|
580 | 27 | ), false, $this->channel_rpc_timeout); |
|
581 | } |
||
582 | |||
583 | /** |
||
584 | * Confirms bind successful |
||
585 | */ |
||
586 | 26 | protected function queue_bind_ok() |
|
587 | { |
||
588 | 26 | } |
|
589 | |||
590 | /** |
||
591 | * Unbind queue from an exchange |
||
592 | * |
||
593 | * @param string $queue |
||
594 | * @param string $exchange |
||
595 | * @param string $routing_key |
||
596 | * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments |
||
597 | * @param int|null $ticket |
||
598 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
599 | * @return mixed |
||
600 | */ |
||
601 | public function queue_unbind( |
||
602 | $queue, |
||
603 | $exchange, |
||
604 | $routing_key = '', |
||
605 | $arguments = array(), |
||
606 | $ticket = null |
||
607 | ) { |
||
608 | $ticket = $this->getTicket($ticket); |
||
609 | |||
610 | list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind( |
||
611 | $ticket, |
||
612 | $queue, |
||
613 | $exchange, |
||
614 | $routing_key, |
||
615 | $arguments |
||
616 | ); |
||
617 | |||
618 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
619 | |||
620 | return $this->wait(array( |
||
621 | $this->waitHelper->get_wait('queue.unbind_ok') |
||
622 | ), false, $this->channel_rpc_timeout); |
||
623 | } |
||
624 | |||
625 | /** |
||
626 | * Confirms unbind successful |
||
627 | */ |
||
628 | protected function queue_unbind_ok() |
||
629 | { |
||
630 | } |
||
631 | |||
632 | /** |
||
633 | * Declares queue, creates if needed |
||
634 | * |
||
635 | * @param string $queue |
||
636 | * @param bool $passive |
||
637 | * @param bool $durable |
||
638 | * @param bool $exclusive |
||
639 | * @param bool $auto_delete |
||
640 | * @param bool $nowait |
||
641 | * @param array|AMQPTable $arguments |
||
642 | * @param int|null $ticket |
||
643 | * @return array|null |
||
644 | *@throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
645 | */ |
||
646 | 31 | View Code Duplication | public function queue_declare( |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
647 | $queue = '', |
||
648 | $passive = false, |
||
649 | $durable = false, |
||
650 | $exclusive = false, |
||
651 | $auto_delete = true, |
||
652 | $nowait = false, |
||
653 | $arguments = array(), |
||
654 | $ticket = null |
||
655 | ) { |
||
656 | 31 | $ticket = $this->getTicket($ticket); |
|
657 | |||
658 | 31 | list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare( |
|
659 | 31 | $ticket, |
|
660 | $queue, |
||
661 | $passive, |
||
662 | $durable, |
||
663 | $exclusive, |
||
664 | $auto_delete, |
||
665 | $nowait, |
||
666 | $arguments |
||
667 | ); |
||
668 | |||
669 | 31 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
670 | |||
671 | 31 | if ($nowait) { |
|
672 | return null; |
||
673 | } |
||
674 | |||
675 | 31 | return $this->wait(array( |
|
676 | 31 | $this->waitHelper->get_wait('queue.declare_ok') |
|
677 | 31 | ), false, $this->channel_rpc_timeout); |
|
678 | } |
||
679 | |||
680 | /** |
||
681 | * Confirms a queue definition |
||
682 | * |
||
683 | * @param AMQPReader $reader |
||
684 | * @return string[] |
||
685 | */ |
||
686 | 30 | protected function queue_declare_ok($reader) |
|
687 | { |
||
688 | 30 | $queue = $reader->read_shortstr(); |
|
689 | 30 | $message_count = $reader->read_long(); |
|
690 | 30 | $consumer_count = $reader->read_long(); |
|
691 | |||
692 | 30 | return array($queue, $message_count, $consumer_count); |
|
693 | } |
||
694 | |||
695 | /** |
||
696 | * Deletes a queue |
||
697 | * |
||
698 | * @param string $queue |
||
699 | * @param bool $if_unused |
||
700 | * @param bool $if_empty |
||
701 | * @param bool $nowait |
||
702 | * @param int|null $ticket |
||
703 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
704 | * @return mixed|null |
||
705 | */ |
||
706 | 7 | public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null) |
|
707 | { |
||
708 | 7 | $ticket = $this->getTicket($ticket); |
|
709 | |||
710 | 7 | list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete( |
|
711 | 7 | $ticket, |
|
712 | $queue, |
||
713 | $if_unused, |
||
714 | $if_empty, |
||
715 | $nowait |
||
716 | ); |
||
717 | |||
718 | 7 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
719 | |||
720 | 7 | if ($nowait) { |
|
721 | 1 | return null; |
|
722 | } |
||
723 | |||
724 | 6 | return $this->wait(array( |
|
725 | 6 | $this->waitHelper->get_wait('queue.delete_ok') |
|
726 | 6 | ), false, $this->channel_rpc_timeout); |
|
727 | } |
||
728 | |||
729 | /** |
||
730 | * Confirms deletion of a queue |
||
731 | * |
||
732 | * @param AMQPReader $reader |
||
733 | * @return string |
||
734 | */ |
||
735 | 6 | protected function queue_delete_ok($reader) |
|
736 | { |
||
737 | 6 | return $reader->read_long(); |
|
738 | } |
||
739 | |||
740 | /** |
||
741 | * Purges a queue |
||
742 | * |
||
743 | * @param string $queue |
||
744 | * @param bool $nowait |
||
745 | * @param int|null $ticket |
||
746 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
747 | * @return mixed|null |
||
748 | */ |
||
749 | View Code Duplication | public function queue_purge($queue = '', $nowait = false, $ticket = null) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
750 | { |
||
751 | $ticket = $this->getTicket($ticket); |
||
752 | list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait); |
||
753 | |||
754 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
755 | |||
756 | if ($nowait) { |
||
757 | return null; |
||
758 | } |
||
759 | |||
760 | return $this->wait(array( |
||
761 | $this->waitHelper->get_wait('queue.purge_ok') |
||
762 | ), false, $this->channel_rpc_timeout); |
||
763 | } |
||
764 | |||
765 | /** |
||
766 | * Confirms a queue purge |
||
767 | * |
||
768 | * @param AMQPReader $reader |
||
769 | * @return string |
||
770 | */ |
||
771 | protected function queue_purge_ok($reader) |
||
772 | { |
||
773 | return $reader->read_long(); |
||
774 | } |
||
775 | |||
776 | /** |
||
777 | * Acknowledges one or more messages |
||
778 | * |
||
779 | * @param int $delivery_tag |
||
780 | * @param bool $multiple |
||
781 | */ |
||
782 | 2 | public function basic_ack($delivery_tag, $multiple = false) |
|
783 | { |
||
784 | 2 | list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple); |
|
785 | 2 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
786 | 2 | } |
|
787 | |||
788 | /** |
||
789 | * Called when the server sends a basic.ack |
||
790 | * |
||
791 | * @param AMQPReader $reader |
||
792 | * @throws AMQPRuntimeException |
||
793 | */ |
||
794 | 3 | View Code Duplication | protected function basic_ack_from_server(AMQPReader $reader) |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
795 | { |
||
796 | 3 | $delivery_tag = $reader->read_longlong(); |
|
797 | 3 | $multiple = (bool) $reader->read_bit(); |
|
798 | |||
799 | 3 | if (!isset($this->published_messages[$delivery_tag])) { |
|
800 | throw new AMQPRuntimeException(sprintf( |
||
801 | 'Server ack\'ed unknown delivery_tag "%s"', |
||
802 | $delivery_tag |
||
803 | )); |
||
804 | } |
||
805 | |||
806 | 3 | $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler); |
|
0 ignored issues
–
show
It seems like
$this->ack_handler can also be of type null ; however, PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept callable , maybe add an additional type check?
If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check: /**
* @return array|string
*/
function returnsDifferentValues($x) {
if ($x) {
return 'foo';
}
return array();
}
$x = returnsDifferentValues($y);
if (is_array($x)) {
// $x is an array.
}
If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.
Loading history...
|
|||
807 | 3 | } |
|
808 | |||
809 | /** |
||
810 | * Called when the server sends a basic.nack |
||
811 | * |
||
812 | * @param AMQPReader $reader |
||
813 | * @throws AMQPRuntimeException |
||
814 | */ |
||
815 | View Code Duplication | protected function basic_nack_from_server($reader) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
816 | { |
||
817 | $delivery_tag = $reader->read_longlong(); |
||
818 | $multiple = (bool) $reader->read_bit(); |
||
819 | |||
820 | if (!isset($this->published_messages[$delivery_tag])) { |
||
821 | throw new AMQPRuntimeException(sprintf( |
||
822 | 'Server nack\'ed unknown delivery_tag "%s"', |
||
823 | $delivery_tag |
||
824 | )); |
||
825 | } |
||
826 | |||
827 | $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler); |
||
0 ignored issues
–
show
It seems like
$this->nack_handler can also be of type null ; however, PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept callable , maybe add an additional type check?
If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check: /**
* @return array|string
*/
function returnsDifferentValues($x) {
if ($x) {
return 'foo';
}
return array();
}
$x = returnsDifferentValues($y);
if (is_array($x)) {
// $x is an array.
}
If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.
Loading history...
|
|||
828 | } |
||
829 | |||
830 | /** |
||
831 | * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler |
||
832 | * |
||
833 | * @param int $delivery_tag |
||
834 | * @param bool $multiple |
||
835 | * @param callable $handler |
||
836 | */ |
||
837 | 3 | protected function internal_ack_handler($delivery_tag, $multiple, $handler) |
|
838 | { |
||
839 | 3 | if ($multiple) { |
|
840 | $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag); |
||
841 | |||
842 | foreach ($keys as $key) { |
||
843 | $this->internal_ack_handler($key, false, $handler); |
||
844 | } |
||
845 | } else { |
||
846 | 3 | $message = $this->get_and_unset_message($delivery_tag); |
|
847 | 3 | $this->dispatch_to_handler($handler, array($message)); |
|
848 | } |
||
849 | 3 | } |
|
850 | |||
851 | /** |
||
852 | * @param AMQPMessage[] $messages |
||
853 | * @param string $value |
||
854 | * @return mixed |
||
855 | */ |
||
856 | protected function get_keys_less_or_equal(array $messages, $value) |
||
857 | { |
||
858 | $value = (int) $value; |
||
859 | $keys = array_reduce( |
||
860 | array_keys($messages), |
||
861 | /** |
||
862 | * @param string $key |
||
863 | */ |
||
864 | function ($keys, $key) use ($value) { |
||
865 | if ($key <= $value) { |
||
866 | $keys[] = $key; |
||
867 | } |
||
868 | |||
869 | return $keys; |
||
870 | }, |
||
871 | array() |
||
872 | ); |
||
873 | |||
874 | return $keys; |
||
875 | } |
||
876 | |||
877 | /** |
||
878 | * Rejects one or several received messages |
||
879 | * |
||
880 | * @param int $delivery_tag |
||
881 | * @param bool $multiple |
||
882 | * @param bool $requeue |
||
883 | */ |
||
884 | public function basic_nack($delivery_tag, $multiple = false, $requeue = false) |
||
885 | { |
||
886 | list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue); |
||
887 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
888 | } |
||
889 | |||
890 | /** |
||
891 | * Ends a queue consumer |
||
892 | * |
||
893 | * @param string $consumer_tag |
||
894 | * @param bool $nowait |
||
895 | * @param bool $noreturn |
||
896 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
897 | * @return mixed |
||
898 | */ |
||
899 | 3 | View Code Duplication | public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false) |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
900 | { |
||
901 | 3 | list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait); |
|
902 | 3 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
903 | |||
904 | 3 | if ($nowait || $noreturn) { |
|
905 | unset($this->callbacks[$consumer_tag]); |
||
906 | return $consumer_tag; |
||
907 | } |
||
908 | |||
909 | 3 | return $this->wait(array( |
|
910 | 3 | $this->waitHelper->get_wait('basic.cancel_ok') |
|
911 | 3 | ), false, $this->channel_rpc_timeout); |
|
912 | } |
||
913 | |||
914 | /** |
||
915 | * @param AMQPReader $reader |
||
916 | * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException |
||
917 | */ |
||
918 | protected function basic_cancel_from_server(AMQPReader $reader) |
||
919 | { |
||
920 | throw new AMQPBasicCancelException($reader->read_shortstr()); |
||
921 | } |
||
922 | |||
923 | /** |
||
924 | * Confirm a cancelled consumer |
||
925 | * |
||
926 | * @param AMQPReader $reader |
||
927 | * @return string |
||
928 | */ |
||
929 | 3 | protected function basic_cancel_ok($reader) |
|
930 | { |
||
931 | 3 | $consumerTag = $reader->read_shortstr(); |
|
932 | 3 | unset($this->callbacks[$consumerTag]); |
|
933 | |||
934 | 3 | return $consumerTag; |
|
935 | } |
||
936 | |||
937 | /** |
||
938 | * @return bool |
||
939 | */ |
||
940 | 1 | public function is_consuming() |
|
941 | { |
||
942 | 1 | return !empty($this->callbacks); |
|
943 | } |
||
944 | |||
945 | /** |
||
946 | * Start a queue consumer. |
||
947 | * This method asks the server to start a "consumer", which is a transient request for messages |
||
948 | * from a specific queue. |
||
949 | * Consumers last as long as the channel they were declared on, or until the client cancels them. |
||
950 | * |
||
951 | * @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume |
||
952 | * |
||
953 | * @param string $queue |
||
954 | * @param string $consumer_tag |
||
955 | * @param bool $no_local |
||
956 | * @param bool $no_ack |
||
957 | * @param bool $exclusive |
||
958 | * @param bool $nowait |
||
959 | * @param callable|null $callback |
||
960 | * @param int|null $ticket |
||
961 | * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments |
||
962 | * |
||
963 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
964 | * @throws \InvalidArgumentException |
||
965 | * @return string |
||
966 | */ |
||
967 | 7 | public function basic_consume( |
|
968 | $queue = '', |
||
969 | $consumer_tag = '', |
||
970 | $no_local = false, |
||
971 | $no_ack = false, |
||
972 | $exclusive = false, |
||
973 | $nowait = false, |
||
974 | $callback = null, |
||
975 | $ticket = null, |
||
976 | $arguments = array() |
||
977 | ) { |
||
978 | 7 | if (null !== $callback) { |
|
979 | 6 | Assert::isCallable($callback); |
|
980 | } |
||
981 | 6 | if ($nowait && empty($consumer_tag)) { |
|
982 | 1 | throw new \InvalidArgumentException('Cannot start consumer without consumer_tag and no-wait=true'); |
|
983 | } |
||
984 | 5 | if (!empty($consumer_tag) && array_key_exists($consumer_tag, $this->callbacks)) { |
|
985 | 1 | throw new \InvalidArgumentException('This consumer tag is already registered.'); |
|
986 | } |
||
987 | |||
988 | 5 | $ticket = $this->getTicket($ticket); |
|
989 | 5 | list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume( |
|
990 | 5 | $ticket, |
|
991 | $queue, |
||
992 | $consumer_tag, |
||
993 | $no_local, |
||
994 | $no_ack, |
||
995 | $exclusive, |
||
996 | $nowait, |
||
997 | 5 | $this->protocolVersion === Wire\Constants091::VERSION ? $arguments : null |
|
998 | ); |
||
999 | |||
1000 | 5 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
1001 | |||
1002 | 5 | if (false === $nowait) { |
|
1003 | 5 | $consumer_tag = $this->wait(array( |
|
1004 | 5 | $this->waitHelper->get_wait('basic.consume_ok') |
|
1005 | 5 | ), false, $this->channel_rpc_timeout); |
|
1006 | } |
||
1007 | |||
1008 | 5 | $this->callbacks[$consumer_tag] = $callback; |
|
1009 | |||
1010 | 5 | return $consumer_tag; |
|
1011 | } |
||
1012 | |||
1013 | /** |
||
1014 | * Confirms a new consumer |
||
1015 | * |
||
1016 | * @param AMQPReader $reader |
||
1017 | * @return string |
||
1018 | */ |
||
1019 | 5 | protected function basic_consume_ok($reader) |
|
1020 | { |
||
1021 | 5 | return $reader->read_shortstr(); |
|
1022 | } |
||
1023 | |||
1024 | /** |
||
1025 | * Notifies the client of a consumer message |
||
1026 | * |
||
1027 | * @param AMQPReader $reader |
||
1028 | * @param AMQPMessage $message |
||
1029 | */ |
||
1030 | 4 | protected function basic_deliver($reader, $message) |
|
1031 | { |
||
1032 | 4 | $consumer_tag = $reader->read_shortstr(); |
|
1033 | 4 | $delivery_tag = $reader->read_longlong(); |
|
1034 | 4 | $redelivered = $reader->read_bit(); |
|
1035 | 4 | $exchange = $reader->read_shortstr(); |
|
1036 | 4 | $routing_key = $reader->read_shortstr(); |
|
1037 | |||
1038 | $message |
||
1039 | 4 | ->setChannel($this) |
|
1040 | 4 | ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key) |
|
1041 | 4 | ->setConsumerTag($consumer_tag); |
|
1042 | |||
1043 | 4 | if (isset($this->callbacks[$consumer_tag])) { |
|
1044 | 4 | call_user_func($this->callbacks[$consumer_tag], $message); |
|
1045 | } |
||
1046 | 3 | } |
|
1047 | |||
1048 | /** |
||
1049 | * Direct access to a queue if no message was available in the queue, return null |
||
1050 | * |
||
1051 | * @param string $queue |
||
1052 | * @param bool $no_ack |
||
1053 | * @param int|null $ticket |
||
1054 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
1055 | * @return AMQPMessage|null |
||
1056 | */ |
||
1057 | 8 | public function basic_get($queue = '', $no_ack = false, $ticket = null) |
|
1058 | { |
||
1059 | 8 | $ticket = $this->getTicket($ticket); |
|
1060 | 8 | list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack); |
|
1061 | |||
1062 | 8 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
1063 | |||
1064 | 8 | return $this->wait(array( |
|
1065 | 8 | $this->waitHelper->get_wait('basic.get_ok'), |
|
1066 | 8 | $this->waitHelper->get_wait('basic.get_empty') |
|
1067 | 8 | ), false, $this->channel_rpc_timeout); |
|
1068 | } |
||
1069 | |||
1070 | /** |
||
1071 | * Indicates no messages available |
||
1072 | */ |
||
1073 | protected function basic_get_empty() |
||
1074 | { |
||
1075 | } |
||
1076 | |||
1077 | /** |
||
1078 | * Provides client with a message |
||
1079 | * |
||
1080 | * @param AMQPReader $reader |
||
1081 | * @param AMQPMessage $message |
||
1082 | * @return AMQPMessage |
||
1083 | */ |
||
1084 | 8 | protected function basic_get_ok($reader, $message) |
|
1085 | { |
||
1086 | 8 | $delivery_tag = $reader->read_longlong(); |
|
1087 | 8 | $redelivered = $reader->read_bit(); |
|
1088 | 8 | $exchange = $reader->read_shortstr(); |
|
1089 | 8 | $routing_key = $reader->read_shortstr(); |
|
1090 | 8 | $message_count = $reader->read_long(); |
|
1091 | |||
1092 | $message |
||
1093 | 8 | ->setChannel($this) |
|
1094 | 8 | ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key) |
|
1095 | 8 | ->setMessageCount($message_count); |
|
1096 | |||
1097 | 8 | return $message; |
|
1098 | } |
||
1099 | |||
1100 | /** |
||
1101 | * @param string $exchange |
||
1102 | * @param string $routing_key |
||
1103 | * @param bool $mandatory |
||
1104 | * @param bool $immediate |
||
1105 | * @param int $ticket |
||
1106 | * @return mixed |
||
1107 | */ |
||
1108 | 14 | private function prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket) |
|
1109 | { |
||
1110 | 14 | $cache_key = sprintf( |
|
1111 | 14 | '%s|%s|%s|%s|%s', |
|
1112 | $exchange, |
||
1113 | $routing_key, |
||
1114 | $mandatory, |
||
1115 | $immediate, |
||
1116 | $ticket |
||
1117 | ); |
||
1118 | 14 | if (false === isset($this->publish_cache[$cache_key])) { |
|
1119 | 14 | $ticket = $this->getTicket($ticket); |
|
1120 | 14 | list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish( |
|
1121 | 14 | $ticket, |
|
1122 | $exchange, |
||
1123 | $routing_key, |
||
1124 | $mandatory, |
||
1125 | $immediate |
||
1126 | ); |
||
1127 | |||
1128 | 14 | $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args); |
|
1129 | 14 | $this->publish_cache[$cache_key] = $pkt->getvalue(); |
|
1130 | 14 | if (count($this->publish_cache) > $this->publish_cache_max_size) { |
|
1131 | reset($this->publish_cache); |
||
1132 | $old_key = key($this->publish_cache); |
||
1133 | unset($this->publish_cache[$old_key]); |
||
1134 | } |
||
1135 | } |
||
1136 | |||
1137 | 14 | return $this->publish_cache[$cache_key]; |
|
1138 | } |
||
1139 | |||
1140 | /** |
||
1141 | * Publishes a message |
||
1142 | * |
||
1143 | * @param AMQPMessage $msg |
||
1144 | * @param string $exchange |
||
1145 | * @param string $routing_key |
||
1146 | * @param bool $mandatory |
||
1147 | * @param bool $immediate |
||
1148 | * @param int|null $ticket |
||
1149 | * @throws AMQPChannelClosedException |
||
1150 | * @throws AMQPConnectionClosedException |
||
1151 | * @throws AMQPConnectionBlockedException |
||
1152 | */ |
||
1153 | 15 | public function basic_publish( |
|
1154 | $msg, |
||
1155 | $exchange = '', |
||
1156 | $routing_key = '', |
||
1157 | $mandatory = false, |
||
1158 | $immediate = false, |
||
1159 | $ticket = null |
||
1160 | ) { |
||
1161 | 15 | $this->checkConnection(); |
|
1162 | 14 | $pkt = new AMQPWriter(); |
|
1163 | 14 | $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket)); |
|
1164 | |||
1165 | try { |
||
1166 | 14 | $this->connection->send_content( |
|
1167 | 14 | $this->channel_id, |
|
1168 | 14 | 60, |
|
1169 | 14 | 0, |
|
1170 | 14 | mb_strlen($msg->body, 'ASCII'), |
|
1171 | 14 | $msg->serialize_properties(), |
|
1172 | 14 | $msg->body, |
|
1173 | $pkt |
||
1174 | ); |
||
1175 | } catch (AMQPConnectionClosedException $e) { |
||
1176 | $this->do_close(); |
||
1177 | throw $e; |
||
1178 | } |
||
1179 | |||
1180 | 14 | if ($this->next_delivery_tag > 0) { |
|
1181 | 3 | $this->published_messages[$this->next_delivery_tag] = $msg; |
|
1182 | 3 | $msg->setDeliveryInfo($this->next_delivery_tag, false, $exchange, $routing_key); |
|
1183 | 3 | $this->next_delivery_tag++; |
|
1184 | } |
||
1185 | 14 | } |
|
1186 | |||
1187 | /** |
||
1188 | * @param AMQPMessage $message |
||
1189 | * @param string $exchange |
||
1190 | * @param string $routing_key |
||
1191 | * @param bool $mandatory |
||
1192 | * @param bool $immediate |
||
1193 | * @param int|null $ticket |
||
1194 | */ |
||
1195 | public function batch_basic_publish( |
||
1196 | $message, |
||
1197 | $exchange = '', |
||
1198 | $routing_key = '', |
||
1199 | $mandatory = false, |
||
1200 | $immediate = false, |
||
1201 | $ticket = null |
||
1202 | ) { |
||
1203 | $this->batch_messages[] = [ |
||
1204 | $message, |
||
1205 | $exchange, |
||
1206 | $routing_key, |
||
1207 | $mandatory, |
||
1208 | $immediate, |
||
1209 | $ticket |
||
1210 | ]; |
||
1211 | } |
||
1212 | |||
1213 | /** |
||
1214 | * Publish batch |
||
1215 | * |
||
1216 | * @return void |
||
1217 | * @throws AMQPChannelClosedException |
||
1218 | * @throws AMQPConnectionClosedException |
||
1219 | * @throws AMQPConnectionBlockedException |
||
1220 | */ |
||
1221 | public function publish_batch() |
||
1222 | { |
||
1223 | if (empty($this->batch_messages)) { |
||
1224 | return; |
||
1225 | } |
||
1226 | |||
1227 | /** @var AMQPWriter $pkt */ |
||
1228 | $pkt = new AMQPWriter(); |
||
1229 | |||
1230 | foreach ($this->batch_messages as $m) { |
||
1231 | /** @var AMQPMessage $msg */ |
||
1232 | $msg = $m[0]; |
||
1233 | |||
1234 | $exchange = isset($m[1]) ? $m[1] : ''; |
||
1235 | $routing_key = isset($m[2]) ? $m[2] : ''; |
||
1236 | $mandatory = isset($m[3]) ? $m[3] : false; |
||
1237 | $immediate = isset($m[4]) ? $m[4] : false; |
||
1238 | $ticket = isset($m[5]) ? $m[5] : null; |
||
1239 | $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket)); |
||
1240 | |||
1241 | $this->connection->prepare_content( |
||
1242 | $this->channel_id, |
||
1243 | 60, |
||
1244 | 0, |
||
1245 | mb_strlen($msg->body, 'ASCII'), |
||
1246 | $msg->serialize_properties(), |
||
1247 | $msg->body, |
||
1248 | $pkt |
||
1249 | ); |
||
1250 | |||
1251 | if ($this->next_delivery_tag > 0) { |
||
1252 | $this->published_messages[$this->next_delivery_tag] = $msg; |
||
1253 | $this->next_delivery_tag++; |
||
1254 | } |
||
1255 | } |
||
1256 | |||
1257 | $this->checkConnection(); |
||
1258 | $this->connection->write($pkt->getvalue()); |
||
1259 | $this->batch_messages = array(); |
||
1260 | } |
||
1261 | |||
1262 | /** |
||
1263 | * Specifies QoS |
||
1264 | * |
||
1265 | * @param int $prefetch_size |
||
1266 | * @param int $prefetch_count |
||
1267 | * @param bool $a_global |
||
1268 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
1269 | * @return mixed |
||
1270 | */ |
||
1271 | View Code Duplication | public function basic_qos($prefetch_size, $prefetch_count, $a_global) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
1272 | { |
||
1273 | list($class_id, $method_id, $args) = $this->protocolWriter->basicQos( |
||
1274 | $prefetch_size, |
||
1275 | $prefetch_count, |
||
1276 | $a_global |
||
1277 | ); |
||
1278 | |||
1279 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
1280 | |||
1281 | return $this->wait(array( |
||
1282 | $this->waitHelper->get_wait('basic.qos_ok') |
||
1283 | ), false, $this->channel_rpc_timeout); |
||
1284 | } |
||
1285 | |||
1286 | /** |
||
1287 | * Confirms QoS request |
||
1288 | */ |
||
1289 | protected function basic_qos_ok() |
||
1290 | { |
||
1291 | } |
||
1292 | |||
1293 | /** |
||
1294 | * Redelivers unacknowledged messages |
||
1295 | * |
||
1296 | * @param bool $requeue |
||
1297 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
1298 | * @return mixed |
||
1299 | */ |
||
1300 | View Code Duplication | public function basic_recover($requeue = false) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
1301 | { |
||
1302 | list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue); |
||
1303 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
1304 | |||
1305 | return $this->wait(array( |
||
1306 | $this->waitHelper->get_wait('basic.recover_ok') |
||
1307 | ), false, $this->channel_rpc_timeout); |
||
1308 | } |
||
1309 | |||
1310 | /** |
||
1311 | * Confirm the requested recover |
||
1312 | */ |
||
1313 | protected function basic_recover_ok() |
||
1314 | { |
||
1315 | } |
||
1316 | |||
1317 | /** |
||
1318 | * Rejects an incoming message |
||
1319 | * |
||
1320 | * @param int $delivery_tag |
||
1321 | * @param bool $requeue |
||
1322 | */ |
||
1323 | public function basic_reject($delivery_tag, $requeue) |
||
1324 | { |
||
1325 | list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue); |
||
1326 | $this->send_method_frame(array($class_id, $method_id), $args); |
||
1327 | } |
||
1328 | |||
1329 | /** |
||
1330 | * Returns a failed message |
||
1331 | * |
||
1332 | * @param AMQPReader $reader |
||
1333 | * @param AMQPMessage $message |
||
1334 | */ |
||
1335 | protected function basic_return($reader, $message) |
||
1336 | { |
||
1337 | $callback = $this->basic_return_callback; |
||
1338 | if (!is_callable($callback)) { |
||
1339 | $this->debug->debug_msg('Skipping unhandled basic_return message'); |
||
1340 | return null; |
||
1341 | } |
||
1342 | |||
1343 | $reply_code = $reader->read_short(); |
||
1344 | $reply_text = $reader->read_shortstr(); |
||
1345 | $exchange = $reader->read_shortstr(); |
||
1346 | $routing_key = $reader->read_shortstr(); |
||
1347 | |||
1348 | call_user_func_array($callback, array( |
||
1349 | $reply_code, |
||
1350 | $reply_text, |
||
1351 | $exchange, |
||
1352 | $routing_key, |
||
1353 | $message, |
||
1354 | )); |
||
1355 | } |
||
1356 | |||
1357 | /** |
||
1358 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
1359 | * @return mixed |
||
1360 | */ |
||
1361 | View Code Duplication | public function tx_commit() |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
1362 | { |
||
1363 | $this->send_method_frame(array(90, 20)); |
||
1364 | |||
1365 | return $this->wait(array( |
||
1366 | $this->waitHelper->get_wait('tx.commit_ok') |
||
1367 | ), false, $this->channel_rpc_timeout); |
||
1368 | } |
||
1369 | |||
1370 | /** |
||
1371 | * Confirms a successful commit |
||
1372 | */ |
||
1373 | protected function tx_commit_ok() |
||
1374 | { |
||
1375 | } |
||
1376 | |||
1377 | /** |
||
1378 | * Rollbacks the current transaction |
||
1379 | * |
||
1380 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
1381 | * @return mixed |
||
1382 | */ |
||
1383 | View Code Duplication | public function tx_rollback() |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
1384 | { |
||
1385 | $this->send_method_frame(array(90, 30)); |
||
1386 | |||
1387 | return $this->wait(array( |
||
1388 | $this->waitHelper->get_wait('tx.rollback_ok') |
||
1389 | ), false, $this->channel_rpc_timeout); |
||
1390 | } |
||
1391 | |||
1392 | /** |
||
1393 | * Confirms a successful rollback |
||
1394 | */ |
||
1395 | protected function tx_rollback_ok() |
||
1396 | { |
||
1397 | } |
||
1398 | |||
1399 | /** |
||
1400 | * Puts the channel into confirm mode |
||
1401 | * Beware that only non-transactional channels may be put into confirm mode and vice versa |
||
1402 | * |
||
1403 | * @param bool $nowait |
||
1404 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
1405 | */ |
||
1406 | 4 | public function confirm_select($nowait = false) |
|
1407 | { |
||
1408 | 4 | list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait); |
|
1409 | |||
1410 | 4 | $this->send_method_frame(array($class_id, $method_id), $args); |
|
1411 | |||
1412 | 4 | if ($nowait) { |
|
1413 | return null; |
||
1414 | } |
||
1415 | |||
1416 | 4 | $this->wait(array( |
|
1417 | 4 | $this->waitHelper->get_wait('confirm.select_ok') |
|
1418 | 4 | ), false, $this->channel_rpc_timeout); |
|
1419 | 3 | $this->next_delivery_tag = 1; |
|
1420 | 3 | } |
|
1421 | |||
1422 | /** |
||
1423 | * Confirms a selection |
||
1424 | */ |
||
1425 | 3 | public function confirm_select_ok() |
|
1426 | { |
||
1427 | 3 | } |
|
1428 | |||
1429 | /** |
||
1430 | * Waits for pending acks and nacks from the server. |
||
1431 | * If there are no pending acks, the method returns immediately |
||
1432 | * |
||
1433 | * @param int|float $timeout Waits until $timeout value is reached |
||
1434 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
1435 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
1436 | */ |
||
1437 | 2 | View Code Duplication | public function wait_for_pending_acks($timeout = 0) |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
1438 | { |
||
1439 | $functions = array( |
||
1440 | 2 | $this->waitHelper->get_wait('basic.ack'), |
|
1441 | 2 | $this->waitHelper->get_wait('basic.nack'), |
|
1442 | ); |
||
1443 | 2 | $timeout = max(0, $timeout); |
|
1444 | 2 | while (!empty($this->published_messages)) { |
|
1445 | 2 | $this->wait($functions, false, $timeout); |
|
1446 | } |
||
1447 | 2 | } |
|
1448 | |||
1449 | /** |
||
1450 | * Waits for pending acks, nacks and returns from the server. |
||
1451 | * If there are no pending acks, the method returns immediately. |
||
1452 | * |
||
1453 | * @param int|float $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks. |
||
1454 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
1455 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
1456 | */ |
||
1457 | 1 | View Code Duplication | public function wait_for_pending_acks_returns($timeout = 0) |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
1458 | { |
||
1459 | $functions = array( |
||
1460 | 1 | $this->waitHelper->get_wait('basic.ack'), |
|
1461 | 1 | $this->waitHelper->get_wait('basic.nack'), |
|
1462 | 1 | $this->waitHelper->get_wait('basic.return'), |
|
1463 | ); |
||
1464 | |||
1465 | 1 | $timeout = max(0, $timeout); |
|
1466 | 1 | while (!empty($this->published_messages)) { |
|
1467 | 1 | $this->wait($functions, false, $timeout); |
|
1468 | } |
||
1469 | 1 | } |
|
1470 | |||
1471 | /** |
||
1472 | * Selects standard transaction mode |
||
1473 | * |
||
1474 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded |
||
1475 | * @return mixed |
||
1476 | */ |
||
1477 | View Code Duplication | public function tx_select() |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
1478 | { |
||
1479 | $this->send_method_frame(array(90, 10)); |
||
1480 | |||
1481 | return $this->wait(array( |
||
1482 | $this->waitHelper->get_wait('tx.select_ok') |
||
1483 | ), false, $this->channel_rpc_timeout); |
||
1484 | } |
||
1485 | |||
1486 | /** |
||
1487 | * Confirms transaction mode |
||
1488 | */ |
||
1489 | protected function tx_select_ok() |
||
1490 | { |
||
1491 | } |
||
1492 | |||
1493 | /** |
||
1494 | * @param int|null $ticket |
||
1495 | * @return int |
||
1496 | */ |
||
1497 | 34 | protected function getTicket($ticket) |
|
1498 | { |
||
1499 | 34 | return (null === $ticket) ? $this->default_ticket : $ticket; |
|
1500 | } |
||
1501 | |||
1502 | /** |
||
1503 | * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it. |
||
1504 | * |
||
1505 | * @param int $index |
||
1506 | * @return AMQPMessage |
||
1507 | */ |
||
1508 | 3 | protected function get_and_unset_message($index) |
|
1509 | { |
||
1510 | 3 | $message = $this->published_messages[$index]; |
|
1511 | 3 | unset($this->published_messages[$index]); |
|
1512 | |||
1513 | 3 | return $message; |
|
1514 | } |
||
1515 | |||
1516 | /** |
||
1517 | * Sets callback for basic_return |
||
1518 | * |
||
1519 | * @param callable $callback |
||
1520 | * @throws \InvalidArgumentException if $callback is not callable |
||
1521 | */ |
||
1522 | public function set_return_listener($callback) |
||
1523 | { |
||
1524 | Assert::isCallable($callback); |
||
1525 | $this->basic_return_callback = $callback; |
||
1526 | } |
||
1527 | |||
1528 | /** |
||
1529 | * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument. |
||
1530 | * |
||
1531 | * @param callable $callback |
||
1532 | * @throws \InvalidArgumentException |
||
1533 | */ |
||
1534 | public function set_nack_handler($callback) |
||
1535 | { |
||
1536 | Assert::isCallable($callback); |
||
1537 | $this->nack_handler = $callback; |
||
1538 | } |
||
1539 | |||
1540 | /** |
||
1541 | * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument. |
||
1542 | * |
||
1543 | * @param callable $callback |
||
1544 | * @throws \InvalidArgumentException |
||
1545 | */ |
||
1546 | 3 | public function set_ack_handler($callback) |
|
1547 | { |
||
1548 | 3 | Assert::isCallable($callback); |
|
1549 | 3 | $this->ack_handler = $callback; |
|
1550 | 3 | } |
|
1551 | |||
1552 | /** |
||
1553 | * @throws AMQPChannelClosedException |
||
1554 | * @throws AMQPConnectionClosedException |
||
1555 | * @throws AMQPConnectionBlockedException |
||
1556 | */ |
||
1557 | 15 | private function checkConnection() |
|
1558 | { |
||
1559 | 15 | if ($this->connection === null || !$this->connection->isConnected()) { |
|
1560 | throw new AMQPChannelClosedException('Channel connection is closed.'); |
||
1561 | } |
||
1562 | 15 | if ($this->connection->isBlocked()) { |
|
1563 | 1 | throw new AMQPConnectionBlockedException(); |
|
1564 | } |
||
1565 | 14 | } |
|
1566 | } |
||
1567 |