Passed
Push — master ( 853986...256188 )
by Ramūnas
41:18 queued 16:15
created

AMQPChannel::closeIfDisconnected()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 4
nc 2
nop 0
dl 0
loc 8
ccs 6
cts 6
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace PhpAmqpLib\Channel;
4
5
use PhpAmqpLib\Connection\AbstractConnection;
6
use PhpAmqpLib\Exception\AMQPBasicCancelException;
7
use PhpAmqpLib\Exception\AMQPChannelClosedException;
8
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
9
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
10
use PhpAmqpLib\Exception\AMQPNoDataException;
11
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
12
use PhpAmqpLib\Exception\AMQPRuntimeException;
13
use PhpAmqpLib\Exception\AMQPTimeoutException;
14
use PhpAmqpLib\Helper\Assert;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use PhpAmqpLib\Wire;
17
use PhpAmqpLib\Wire\AMQPReader;
18
use PhpAmqpLib\Wire\AMQPTable;
19
use PhpAmqpLib\Wire\AMQPWriter;
20
21
class AMQPChannel extends AbstractChannel
22
{
23
    /**
24
     * @var callable[]
25
     * @internal Use is_consuming() to check if there is active callbacks
26
     */
27
    public $callbacks = array();
28
29
    /** @var bool Whether or not the channel has been "opened" */
30
    protected $is_open = false;
31
32
    /** @var int */
33
    protected $default_ticket = 0;
34
35
    /** @var bool */
36
    protected $active = true;
37
38
    /** @var bool */
39
    protected $stopConsume = false;
40
41
    /** @var array */
42
    protected $alerts = array();
43
44
    /** @var bool */
45
    protected $auto_decode;
46
47
    /**
48
     * These parameters will be passed to function in case of basic_return:
49
     *    param int $reply_code
50
     *    param string $reply_text
51
     *    param string $exchange
52
     *    param string $routing_key
53
     *    param AMQPMessage $msg
54
     *
55
     * @var null|callable
56
     */
57
    protected $basic_return_callback;
58
59
    /** @var array Used to keep track of the messages that are going to be batch published. */
60
    protected $batch_messages = array();
61
62
    /**
63
     * If the channel is in confirm_publish mode this array will store all published messages
64
     * until they get ack'ed or nack'ed
65
     *
66
     * @var AMQPMessage[]
67
     */
68
    private $published_messages = array();
69
70
    /** @var int */
71
    private $next_delivery_tag = 0;
72
73
    /** @var null|callable */
74
    private $ack_handler;
75
76
    /** @var null|callable */
77
    private $nack_handler;
78
79
    /**
80
     * Circular buffer to speed up both basic_publish() and publish_batch().
81
     * Max size limited by $publish_cache_max_size.
82
     *
83
     * @var array
84
     * @see basic_publish()
85
     * @see publish_batch()
86
     */
87
    private $publish_cache = array();
88
89
    /**
90
     * Maximal size of $publish_cache
91
     *
92
     * @var int
93
     */
94
    private $publish_cache_max_size = 100;
95
96
    /**
97
     * Maximum time to wait for operations on this channel, in seconds.
98
     * @var float
99
     */
100
    protected $channel_rpc_timeout;
101
102
    /**
103
     * @param AbstractConnection $connection
104
     * @param int|null $channel_id
105
     * @param bool $auto_decode
106
     * @param int|float $channel_rpc_timeout
107
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
108
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
109
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
110
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
111
     */
112 36
    public function __construct($connection, $channel_id = null, $auto_decode = true, $channel_rpc_timeout = 0)
113
    {
114 36
        if ($channel_id == null) {
0 ignored issues
show
Bug Best Practice introduced by
It seems like you are loosely comparing $channel_id of type integer|null against null; this is ambiguous if the integer can be zero. Consider using a strict comparison === instead.
Loading history...
115
            $channel_id = $connection->get_free_channel_id();
116
        }
117
118 36
        parent::__construct($connection, $channel_id);
119
120 36
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
121
122 36
        $this->auto_decode = $auto_decode;
123 36
        $this->channel_rpc_timeout = $channel_rpc_timeout;
124
125
        try {
126 36
            $this->x_open();
127
        } catch (\Exception $e) {
128
            $this->close();
129
            throw $e;
130
        }
131
    }
132
133
    /**
134
     * @return bool
135
     */
136 4
    public function is_open()
137
    {
138 4
        return $this->is_open;
139
    }
140
141
    /**
142
     * Tear down this object, after we've agreed to close with the server.
143
     */
144 31
    protected function do_close()
145
    {
146 31
        if ($this->channel_id !== null) {
147 31
            unset($this->connection->channels[$this->channel_id]);
148
        }
149 31
        $this->channel_id = $this->connection = null;
150 31
        $this->is_open = false;
151 31
        $this->callbacks = array();
152
    }
153
154
    /**
155
     * Only for AMQP0.8.0
156
     * This method allows the server to send a non-fatal warning to
157
     * the client.  This is used for methods that are normally
158
     * asynchronous and thus do not have confirmations, and for which
159
     * the server may detect errors that need to be reported.  Fatal
160
     * errors are handled as channel or connection exceptions; non-
161
     * fatal errors are sent through this method.
162
     *
163
     * @param AMQPReader $reader
164
     */
165
    protected function channel_alert(AMQPReader $reader): void
166
    {
167
        $reply_code = $reader->read_short();
168
        $reply_text = $reader->read_shortstr();
169
        $details = $reader->read_table();
170
        array_push($this->alerts, array($reply_code, $reply_text, $details));
171
    }
172
173
    /**
174
     * Request a channel close
175
     *
176
     * @param int $reply_code
177
     * @param string $reply_text
178
     * @param array $method_sig
179
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
180
     * @return mixed
181
     */
182 31
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
183
    {
184 31
        $this->callbacks = array();
185 31
        if ($this->is_open === false || $this->connection === null) {
186 6
            $this->do_close();
187
188 6
            return null; // already closed
189
        }
190 31
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
191
            $reply_code,
192
            $reply_text,
193 31
            $method_sig[0],
194 31
            $method_sig[1]
195
        );
196
197
        try {
198 31
            $this->send_method_frame(array($class_id, $method_id), $args);
199
        } catch (\Exception $e) {
200
            $this->do_close();
201
202
            throw $e;
203
        }
204
205 31
        return $this->wait(array(
206 31
            $this->waitHelper->get_wait('channel.close_ok')
207 31
        ), false, $this->channel_rpc_timeout);
208
    }
209
210
    /**
211
     * Closes a channel if no connection or a connection is closed
212
     *
213
     * @return bool
214 1
     */
215
    public function closeIfDisconnected(): bool
216 1
    {
217 1
        if (!$this->connection || $this->connection->isConnected()) {
218 1
            return false;
219 1
        }
220
221 1
        $this->do_close();
222 1
        return true;
223
    }
224 1
225
    /**
226
     * @param AMQPReader $reader
227
     * @throws AMQPProtocolChannelException
228
     */
229
    protected function channel_close(AMQPReader $reader): void
230
    {
231 31
        $reply_code = $reader->read_short();
232
        $reply_text = $reader->read_shortstr();
233 31
        $class_id = $reader->read_short();
234
        $method_id = $reader->read_short();
235
236
        $this->send_method_frame(array(20, 41));
237
        $this->do_close();
238
239
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
240
    }
241
242
    /**
243
     * Confirm a channel close
244
     * Alias of AMQPChannel::do_close()
245
     */
246
    protected function channel_close_ok()
247
    {
248
        $this->do_close();
249
    }
250
251
    /**
252
     * Enables/disables flow from peer
253
     *
254
     * @param bool $active
255
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
256
     * @return mixed
257
     */
258
    public function flow($active)
259
    {
260
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
261
        $this->send_method_frame(array($class_id, $method_id), $args);
262
263
        return $this->wait(array(
264
            $this->waitHelper->get_wait('channel.flow_ok')
265
        ), false, $this->channel_rpc_timeout);
266
    }
267
268
    protected function channel_flow(AMQPReader $reader): void
269
    {
270
        $this->active = $reader->read_bit();
271
        $this->x_flow_ok($this->active);
272
    }
273
274
    /**
275
     * @param bool $active
276
     */
277
    protected function x_flow_ok($active)
278
    {
279
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
280
        $this->send_method_frame(array($class_id, $method_id), $args);
281
    }
282
283
    protected function channel_flow_ok(AMQPReader $reader): bool
284
    {
285
        return $reader->read_bit();
286
    }
287
288 36
    /**
289
     * @param string $out_of_band
290 36
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
291
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
292
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
293
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
294 36
     * @return mixed
295 36
     */
296
    protected function x_open($out_of_band = '')
297 36
    {
298 36
        if ($this->is_open) {
299 36
            return null;
300
        }
301
302 36
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
303
        $this->send_method_frame(array($class_id, $method_id), $args);
304 36
305
        return $this->wait(array(
306 36
            $this->waitHelper->get_wait('channel.open_ok')
307
        ), false, $this->channel_rpc_timeout);
308
    }
309
310
    protected function channel_open_ok()
311
    {
312
        $this->is_open = true;
313
314
        $this->debug->debug_msg('Channel open');
315
    }
316
317
    /**
318
     * Requests an access ticket
319
     *
320
     * @param string $realm
321
     * @param bool $exclusive
322
     * @param bool $passive
323
     * @param bool $active
324
     * @param bool $write
325
     * @param bool $read
326
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
327
     * @return mixed
328
     */
329
    public function access_request(
330
        $realm,
331
        $exclusive = false,
332
        $passive = false,
333
        $active = false,
334
        $write = false,
335
        $read = false
336
    ) {
337
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
338
            $realm,
339
            $exclusive,
340
            $passive,
341
            $active,
342
            $write,
343
            $read
344
        );
345
346
        $this->send_method_frame(array($class_id, $method_id), $args);
347
348
        return $this->wait(array(
349
            $this->waitHelper->get_wait('access.request_ok')
350
        ), false, $this->channel_rpc_timeout);
351
    }
352
353
    /**
354
     * Grants access to server resources
355
     *
356
     * @param AMQPReader $reader
357
     * @return int
358
     */
359
    protected function access_request_ok(AMQPReader $reader): int
360
    {
361
        $this->default_ticket = $reader->read_short();
362
363
        return $this->default_ticket;
364
    }
365
366
    /**
367
     * Declares exchange
368
     *
369
     * @param string $exchange
370
     * @param string $type
371
     * @param bool $passive
372
     * @param bool $durable
373 29
     * @param bool $auto_delete
374
     * @param bool $internal
375
     * @param bool $nowait
376
     * @param AMQPTable|array $arguments
377
     * @param int|null $ticket
378
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
379
     * @return mixed|null
380
     */
381
    public function exchange_declare(
382
        $exchange,
383
        $type,
384 29
        $passive = false,
385
        $durable = false,
386 29
        $auto_delete = true,
387
        $internal = false,
388
        $nowait = false,
389
        $arguments = array(),
390
        $ticket = null
391
    ) {
392
        $ticket = $this->getTicket($ticket);
393
394
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
395
            $ticket,
396
            $exchange,
397
            $type,
398 29
            $passive,
399
            $durable,
400 25
            $auto_delete,
401
            $internal,
402
            $nowait,
403
            $arguments
404 25
        );
405 25
406 25
        $this->send_method_frame(array($class_id, $method_id), $args);
407
408
        if ($nowait) {
409
            return null;
410
        }
411
412
        return $this->wait(array(
413
            $this->waitHelper->get_wait('exchange.declare_ok')
414
        ), false, $this->channel_rpc_timeout);
415
    }
416
417
    /**
418
     * Confirms an exchange declaration
419
     */
420
    protected function exchange_declare_ok()
421
    {
422
    }
423
424
    /**
425
     * Deletes an exchange
426 9
     *
427
     * @param string $exchange
428
     * @param bool $if_unused
429
     * @param bool $nowait
430
     * @param int|null $ticket
431
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
432 9
     * @return mixed|null
433 9
     */
434
    public function exchange_delete(
435
        $exchange,
436
        $if_unused = false,
437
        $nowait = false,
438
        $ticket = null
439
    ) {
440 9
        $ticket = $this->getTicket($ticket);
441
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
442 9
            $ticket,
443
            $exchange,
444
            $if_unused,
445
            $nowait
446 9
        );
447 9
448 9
        $this->send_method_frame(array($class_id, $method_id), $args);
449
450
        if ($nowait) {
451
            return null;
452
        }
453
454
        return $this->wait(array(
455
            $this->waitHelper->get_wait('exchange.delete_ok')
456
        ), false, $this->channel_rpc_timeout);
457
    }
458
459
    /**
460
     * Confirms deletion of an exchange
461
     */
462
    protected function exchange_delete_ok()
463
    {
464
    }
465
466
    /**
467
     * Binds dest exchange to source exchange
468
     *
469
     * @param string $destination
470
     * @param string $source
471
     * @param string $routing_key
472
     * @param bool $nowait
473
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
474
     * @param int|null $ticket
475
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
476
     * @return mixed|null
477
     */
478
    public function exchange_bind(
479
        $destination,
480
        $source,
481
        $routing_key = '',
482
        $nowait = false,
483
        $arguments = array(),
484
        $ticket = null
485
    ) {
486
        $ticket = $this->getTicket($ticket);
487
488
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
0 ignored issues
show
Bug introduced by
The method exchangeBind() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

488
        /** @scrutinizer ignore-call */ 
489
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
489
            $ticket,
490
            $destination,
491
            $source,
492
            $routing_key,
493
            $nowait,
494
            $arguments
495
        );
496
497
        $this->send_method_frame(array($class_id, $method_id), $args);
498
499
        if ($nowait) {
500
            return null;
501
        }
502
503
        return $this->wait(array(
504
            $this->waitHelper->get_wait('exchange.bind_ok')
505
        ), false, $this->channel_rpc_timeout);
506
    }
507
508
    /**
509
     * Confirms bind successful
510
     */
511
    protected function exchange_bind_ok()
512
    {
513
    }
514
515
    /**
516
     * Unbinds dest exchange from source exchange
517
     *
518
     * @param string $destination
519
     * @param string $source
520
     * @param string $routing_key
521
     * @param bool $nowait
522
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
523
     * @param int|null $ticket
524
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
525
     * @return mixed
526
     */
527
    public function exchange_unbind(
528
        $destination,
529
        $source,
530
        $routing_key = '',
531
        $nowait = false,
532
        $arguments = array(),
533
        $ticket = null
534
    ) {
535
        $ticket = $this->getTicket($ticket);
536
537
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
0 ignored issues
show
Bug introduced by
The method exchangeUnbind() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

537
        /** @scrutinizer ignore-call */ 
538
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
538
            $ticket,
539
            $destination,
540
            $source,
541
            $routing_key,
542
            $nowait,
543
            $arguments
544
        );
545
546
        $this->send_method_frame(array($class_id, $method_id), $args);
547
548
        return $this->wait(array(
549
            $this->waitHelper->get_wait('exchange.unbind_ok')
550
        ), false, $this->channel_rpc_timeout);
551
    }
552
553
    /**
554
     * Confirms unbind successful
555
     */
556
    protected function exchange_unbind_ok()
557
    {
558
    }
559
560
    /**
561
     * Binds queue to an exchange
562
     *
563
     * @param string $queue
564 25
     * @param string $exchange
565
     * @param string $routing_key
566
     * @param bool $nowait
567
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
568
     * @param int|null $ticket
569
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
570
     * @return mixed|null
571
     */
572 25
    public function queue_bind(
573
        $queue,
574 25
        $exchange,
575
        $routing_key = '',
576
        $nowait = false,
577
        $arguments = array(),
578
        $ticket = null
579
    ) {
580
        $ticket = $this->getTicket($ticket);
581
582
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
583 25
            $ticket,
584
            $queue,
585 25
            $exchange,
586
            $routing_key,
587
            $nowait,
588
            $arguments
589 25
        );
590 25
591 25
        $this->send_method_frame(array($class_id, $method_id), $args);
592
593
        if ($nowait) {
594
            return null;
595
        }
596
597
        return $this->wait(array(
598
            $this->waitHelper->get_wait('queue.bind_ok')
599
        ), false, $this->channel_rpc_timeout);
600
    }
601
602
    /**
603
     * Confirms bind successful
604
     */
605
    protected function queue_bind_ok()
606
    {
607
    }
608
609
    /**
610
     * Unbind queue from an exchange
611
     *
612
     * @param string $queue
613
     * @param string $exchange
614
     * @param string $routing_key
615
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
616
     * @param int|null $ticket
617
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
618
     * @return mixed
619
     */
620
    public function queue_unbind(
621
        $queue,
622
        $exchange,
623
        $routing_key = '',
624
        $arguments = array(),
625
        $ticket = null
626
    ) {
627
        $ticket = $this->getTicket($ticket);
628
629
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
630
            $ticket,
631
            $queue,
632
            $exchange,
633
            $routing_key,
634
            $arguments
635
        );
636
637
        $this->send_method_frame(array($class_id, $method_id), $args);
638
639
        return $this->wait(array(
640
            $this->waitHelper->get_wait('queue.unbind_ok')
641
        ), false, $this->channel_rpc_timeout);
642
    }
643
644
    /**
645
     * Confirms unbind successful
646
     */
647
    protected function queue_unbind_ok()
648
    {
649
    }
650
651
    /**
652
     * Declares queue, creates if needed
653
     *
654
     * @param string $queue
655
     * @param bool $passive
656
     * @param bool $durable
657 29
     * @param bool $exclusive
658
     * @param bool $auto_delete
659
     * @param bool $nowait
660
     * @param array|AMQPTable $arguments
661
     * @param int|null $ticket
662
     * @return array|null
663
     *@throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
664
     */
665
    public function queue_declare(
666
        $queue = '',
667 29
        $passive = false,
668
        $durable = false,
669 29
        $exclusive = false,
670
        $auto_delete = true,
671
        $nowait = false,
672
        $arguments = array(),
673
        $ticket = null
674
    ) {
675
        $ticket = $this->getTicket($ticket);
676
677
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
678
            $ticket,
679
            $queue,
680 29
            $passive,
681
            $durable,
682 29
            $exclusive,
683
            $auto_delete,
684
            $nowait,
685
            $arguments
686 29
        );
687 29
688 29
        $this->send_method_frame(array($class_id, $method_id), $args);
689
690
        if ($nowait) {
691
            return null;
692
        }
693
694
        return $this->wait(array(
695
            $this->waitHelper->get_wait('queue.declare_ok')
696
        ), false, $this->channel_rpc_timeout);
697 28
    }
698
699 28
    /**
700 28
     * Confirms a queue definition
701 28
     *
702
     * @param AMQPReader $reader
703 28
     * @return string[]
704
     */
705
    protected function queue_declare_ok(AMQPReader $reader)
706
    {
707
        $queue = $reader->read_shortstr();
708
        $message_count = $reader->read_long();
709
        $consumer_count = $reader->read_long();
710
711
        return array($queue, $message_count, $consumer_count);
712
    }
713
714
    /**
715
     * Deletes a queue
716
     *
717 7
     * @param string $queue
718
     * @param bool $if_unused
719 7
     * @param bool $if_empty
720
     * @param bool $nowait
721 7
     * @param int|null $ticket
722
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
723
     * @return mixed|null
724
     */
725
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
726
    {
727
        $ticket = $this->getTicket($ticket);
728
729 7
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
730
            $ticket,
731 7
            $queue,
732 1
            $if_unused,
733
            $if_empty,
734
            $nowait
735 6
        );
736 6
737 6
        $this->send_method_frame(array($class_id, $method_id), $args);
738
739
        if ($nowait) {
740
            return null;
741
        }
742
743
        return $this->wait(array(
744
            $this->waitHelper->get_wait('queue.delete_ok')
745
        ), false, $this->channel_rpc_timeout);
746 6
    }
747
748 6
    /**
749
     * Confirms deletion of a queue
750
     *
751
     * @param AMQPReader $reader
752
     * @return int|string
753
     */
754
    protected function queue_delete_ok(AMQPReader $reader)
755
    {
756
        return $reader->read_long();
757
    }
758
759
    /**
760
     * Purges a queue
761
     *
762
     * @param string $queue
763
     * @param bool $nowait
764
     * @param int|null $ticket
765
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
766
     * @return mixed|null
767
     */
768
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
769
    {
770
        $ticket = $this->getTicket($ticket);
771
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
772
773
        $this->send_method_frame(array($class_id, $method_id), $args);
774
775
        if ($nowait) {
776
            return null;
777
        }
778
779
        return $this->wait(array(
780
            $this->waitHelper->get_wait('queue.purge_ok')
781
        ), false, $this->channel_rpc_timeout);
782
    }
783
784
    /**
785
     * Confirms a queue purge
786
     *
787
     * @param AMQPReader $reader
788
     * @return int|string
789
     */
790
    protected function queue_purge_ok(AMQPReader $reader)
791
    {
792
        return $reader->read_long();
793 2
    }
794
795 2
    /**
796 2
     * Acknowledges one or more messages
797
     *
798
     * @param int $delivery_tag
799
     * @param bool $multiple
800
     */
801
    public function basic_ack($delivery_tag, $multiple = false)
802
    {
803
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
804
        $this->send_method_frame(array($class_id, $method_id), $args);
805 3
    }
806
807 3
    /**
808 3
     * Called when the server sends a basic.ack
809
     *
810 3
     * @param AMQPReader $reader
811
     * @throws AMQPRuntimeException
812
     */
813
    protected function basic_ack_from_server(AMQPReader $reader): void
814
    {
815
        $delivery_tag = $reader->read_longlong();
816
        $multiple = (bool) $reader->read_bit();
817 3
818
        if (!isset($this->published_messages[$delivery_tag])) {
819
            throw new AMQPRuntimeException(sprintf(
820
                'Server ack\'ed unknown delivery_tag "%s"',
821
                $delivery_tag
822
            ));
823
        }
824
825
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

825
        $this->internal_ack_handler(/** @scrutinizer ignore-type */ $delivery_tag, $multiple, $this->ack_handler);
Loading history...
826
    }
827
828
    /**
829
     * Called when the server sends a basic.nack
830
     *
831
     * @param AMQPReader $reader
832
     * @throws AMQPRuntimeException
833
     */
834
    protected function basic_nack_from_server(AMQPReader $reader): void
835
    {
836
        $delivery_tag = $reader->read_longlong();
837
        $multiple = (bool) $reader->read_bit();
838
839
        if (!isset($this->published_messages[$delivery_tag])) {
840
            throw new AMQPRuntimeException(sprintf(
841
                'Server nack\'ed unknown delivery_tag "%s"',
842
                $delivery_tag
843
            ));
844
        }
845
846
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

846
        $this->internal_ack_handler(/** @scrutinizer ignore-type */ $delivery_tag, $multiple, $this->nack_handler);
Loading history...
847
    }
848 3
849
    /**
850 3
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
851
     *
852
     * @param int $delivery_tag
853
     * @param bool $multiple
854
     * @param callable $handler
855
     */
856
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
857 3
    {
858 3
        if ($multiple) {
859
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
860
861
            foreach ($keys as $key) {
862
                $this->internal_ack_handler($key, false, $handler);
863
            }
864
        } else {
865
            $message = $this->get_and_unset_message($delivery_tag);
866
            $this->dispatch_to_handler($handler, array($message));
867
        }
868
    }
869
870
    /**
871
     * @param AMQPMessage[] $messages
872
     * @param string $value
873
     * @return mixed
874
     */
875
    protected function get_keys_less_or_equal(array $messages, $value)
876
    {
877
        $value = (int) $value;
878
        $keys = array_reduce(
879
            array_keys($messages),
880
            /**
881
             * @param string $key
882
             */
883
            function ($keys, $key) use ($value) {
884
                if ($key <= $value) {
885
                    $keys[] = $key;
886
                }
887
888
                return $keys;
889
            },
890
            array()
891
        );
892
893
        return $keys;
894
    }
895
896
    /**
897
     * Rejects one or several received messages
898
     *
899
     * @param int $delivery_tag
900
     * @param bool $multiple
901
     * @param bool $requeue
902
     */
903
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
904
    {
905
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
0 ignored issues
show
Bug introduced by
The method basicNack() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. Did you maybe mean basicAck()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

905
        /** @scrutinizer ignore-call */ 
906
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
906
        $this->send_method_frame(array($class_id, $method_id), $args);
907
    }
908
909
    /**
910 3
     * Ends a queue consumer
911
     *
912 3
     * @param string $consumer_tag
913 3
     * @param bool $nowait
914
     * @param bool $noreturn
915 3
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
916
     * @return mixed
917
     */
918
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
919
    {
920 3
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
921 3
        $this->send_method_frame(array($class_id, $method_id), $args);
922 3
923
        if ($nowait || $noreturn) {
924
            unset($this->callbacks[$consumer_tag]);
925
            return $consumer_tag;
926
        }
927
928
        return $this->wait(array(
929
            $this->waitHelper->get_wait('basic.cancel_ok')
930
        ), false, $this->channel_rpc_timeout);
931
    }
932
933
    /**
934
     * @param AMQPReader $reader
935
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
936
     */
937
    protected function basic_cancel_from_server(AMQPReader $reader)
938
    {
939
        throw new AMQPBasicCancelException($reader->read_shortstr());
940 3
    }
941
942 3
    /**
943 3
     * Confirm a cancelled consumer
944
     *
945 3
     * @param AMQPReader $reader
946
     * @return string
947
     */
948
    protected function basic_cancel_ok(AMQPReader $reader): string
949
    {
950
        $consumerTag = $reader->read_shortstr();
951 1
        unset($this->callbacks[$consumerTag]);
952
953 1
        return $consumerTag;
954
    }
955
956
    /**
957
     * @return bool
958
     */
959
    public function is_consuming()
960
    {
961
        return !empty($this->callbacks);
962
    }
963
964
    /**
965
     * Start a queue consumer.
966
     * This method asks the server to start a "consumer", which is a transient request for messages
967
     * from a specific queue.
968
     * Consumers last as long as the channel they were declared on, or until the client cancels them.
969
     *
970
     * @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
971
     *
972
     * @param string $queue
973
     * @param string $consumer_tag
974
     * @param bool $no_local
975
     * @param bool $no_ack
976
     * @param bool $exclusive
977
     * @param bool $nowait
978 7
     * @param callable|null $callback
979
     * @param int|null $ticket
980
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
981
     *
982
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
983
     * @throws \InvalidArgumentException
984
     * @return string
985
     */
986
    public function basic_consume(
987
        $queue = '',
988
        $consumer_tag = '',
989 7
        $no_local = false,
990 6
        $no_ack = false,
991
        $exclusive = false,
992 6
        $nowait = false,
993 1
        $callback = null,
994
        $ticket = null,
995 5
        $arguments = array()
996 1
    ) {
997
        if (null !== $callback) {
998
            Assert::isCallable($callback);
999 5
        }
1000 5
        if ($nowait && empty($consumer_tag)) {
1001
            throw new \InvalidArgumentException('Cannot start consumer without consumer_tag and no-wait=true');
1002
        }
1003
        if (!empty($consumer_tag) && array_key_exists($consumer_tag, $this->callbacks)) {
1004
            throw new \InvalidArgumentException('This consumer tag is already registered.');
1005
        }
1006
1007
        $ticket = $this->getTicket($ticket);
1008 5
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
1009
            $ticket,
1010
            $queue,
1011 5
            $consumer_tag,
1012
            $no_local,
1013 5
            $no_ack,
1014 5
            $exclusive,
1015 5
            $nowait,
1016 5
            $this->protocolVersion === Wire\Constants091::VERSION ? $arguments : null
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...annel::$protocolVersion has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

1016
            /** @scrutinizer ignore-deprecated */ $this->protocolVersion === Wire\Constants091::VERSION ? $arguments : null
Loading history...
Unused Code introduced by
The call to PhpAmqpLib\Helper\Protoc...ocol080::basicConsume() has too many arguments starting with $this->protocolVersion =...ION ? $arguments : null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1016
        /** @scrutinizer ignore-call */ 
1017
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
1017
        );
1018
1019 5
        $this->send_method_frame(array($class_id, $method_id), $args);
1020
1021 5
        if (false === $nowait) {
1022
            $consumer_tag = $this->wait(array(
1023
                $this->waitHelper->get_wait('basic.consume_ok')
1024
            ), false, $this->channel_rpc_timeout);
1025
        }
1026
1027
        $this->callbacks[$consumer_tag] = $callback;
1028
1029
        return $consumer_tag;
1030 5
    }
1031
1032 5
    /**
1033
     * Confirms a new consumer
1034
     *
1035
     * @param AMQPReader $reader
1036
     * @return string
1037
     */
1038
    protected function basic_consume_ok(AMQPReader $reader): string
1039
    {
1040
        return $reader->read_shortstr();
1041 4
    }
1042
1043 4
    /**
1044 4
     * Notifies the client of a consumer message
1045 4
     *
1046 4
     * @param AMQPReader $reader
1047 4
     * @param AMQPMessage $message
1048
     */
1049
    protected function basic_deliver(AMQPReader $reader, AMQPMessage $message): void
1050 4
    {
1051 4
        $consumer_tag = $reader->read_shortstr();
1052 4
        $delivery_tag = $reader->read_longlong();
1053
        $redelivered = $reader->read_bit();
1054 4
        $exchange = $reader->read_shortstr();
1055 4
        $routing_key = $reader->read_shortstr();
1056
1057
        $message
1058
            ->setChannel($this)
1059
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $deliveryTag of PhpAmqpLib\Message\AMQPMessage::setDeliveryInfo() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1059
            ->setDeliveryInfo(/** @scrutinizer ignore-type */ $delivery_tag, $redelivered, $exchange, $routing_key)
Loading history...
1060
            ->setConsumerTag($consumer_tag);
1061
1062
        if (isset($this->callbacks[$consumer_tag])) {
1063
            call_user_func($this->callbacks[$consumer_tag], $message);
1064
        }
1065
    }
1066
1067
    /**
1068 8
     * Direct access to a queue if no message was available in the queue, return null
1069
     *
1070 8
     * @param string $queue
1071 8
     * @param bool $no_ack
1072
     * @param int|null $ticket
1073 8
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1074
     * @return AMQPMessage|null
1075 8
     */
1076 8
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
1077 8
    {
1078 8
        $ticket = $this->getTicket($ticket);
1079
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1080
1081
        $this->send_method_frame(array($class_id, $method_id), $args);
1082
1083
        return $this->wait(array(
1084
            $this->waitHelper->get_wait('basic.get_ok'),
1085
            $this->waitHelper->get_wait('basic.get_empty')
1086
        ), false, $this->channel_rpc_timeout);
1087
    }
1088
1089
    /**
1090
     * Indicates no messages available
1091
     */
1092
    protected function basic_get_empty()
1093
    {
1094
    }
1095 8
1096
    /**
1097 8
     * Provides client with a message
1098 8
     *
1099 8
     * @param AMQPReader $reader
1100 8
     * @param AMQPMessage $message
1101 8
     * @return AMQPMessage
1102
     */
1103
    protected function basic_get_ok(AMQPReader $reader, AMQPMessage $message): AMQPMessage
1104 8
    {
1105 8
        $delivery_tag = $reader->read_longlong();
1106 8
        $redelivered = $reader->read_bit();
1107
        $exchange = $reader->read_shortstr();
1108 8
        $routing_key = $reader->read_shortstr();
1109
        $message_count = $reader->read_long();
1110
1111
        $message
1112
            ->setChannel($this)
1113
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $deliveryTag of PhpAmqpLib\Message\AMQPMessage::setDeliveryInfo() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1113
            ->setDeliveryInfo(/** @scrutinizer ignore-type */ $delivery_tag, $redelivered, $exchange, $routing_key)
Loading history...
1114
            ->setMessageCount($message_count);
0 ignored issues
show
Bug introduced by
It seems like $message_count can also be of type string; however, parameter $messageCount of PhpAmqpLib\Message\AMQPMessage::setMessageCount() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1114
            ->setMessageCount(/** @scrutinizer ignore-type */ $message_count);
Loading history...
1115
1116
        return $message;
1117
    }
1118
1119 14
    /**
1120
     * @param string $exchange
1121 14
     * @param string $routing_key
1122
     * @param bool $mandatory
1123
     * @param bool $immediate
1124
     * @param int $ticket
1125
     * @return mixed
1126
     */
1127
    private function prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1128
    {
1129 14
        $cache_key = sprintf(
1130 14
            '%s|%s|%s|%s|%s',
1131 14
            $exchange,
1132
            $routing_key,
1133
            $mandatory,
1134
            $immediate,
1135
            $ticket
1136
        );
1137
        if (false === isset($this->publish_cache[$cache_key])) {
1138
            $ticket = $this->getTicket($ticket);
1139 14
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1140 14
                $ticket,
1141 14
                $exchange,
1142
                $routing_key,
1143
                $mandatory,
1144
                $immediate
1145
            );
1146
1147
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1148 14
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1149
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1150
                reset($this->publish_cache);
1151
                $old_key = key($this->publish_cache);
1152
                unset($this->publish_cache[$old_key]);
1153
            }
1154
        }
1155
1156
        return $this->publish_cache[$cache_key];
1157
    }
1158
1159
    /**
1160
     * Publishes a message
1161
     *
1162
     * @param AMQPMessage $msg
1163
     * @param string $exchange
1164 15
     * @param string $routing_key
1165
     * @param bool $mandatory
1166
     * @param bool $immediate
1167
     * @param int|null $ticket
1168
     * @throws AMQPChannelClosedException
1169
     * @throws AMQPConnectionClosedException
1170
     * @throws AMQPConnectionBlockedException
1171
     */
1172 15
    public function basic_publish(
1173 14
        $msg,
1174 14
        $exchange = '',
1175
        $routing_key = '',
1176
        $mandatory = false,
1177 14
        $immediate = false,
1178 14
        $ticket = null
1179
    ) {
1180
        $this->checkConnection();
1181 14
        $pkt = new AMQPWriter();
1182 14
        $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1183 14
1184
        try {
1185
            $this->connection->send_content(
0 ignored issues
show
Bug introduced by
The method send_content() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1185
            $this->connection->/** @scrutinizer ignore-call */ 
1186
                               send_content(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
1186
                $this->channel_id,
1187
                60,
1188
                0,
1189
                mb_strlen($msg->body, 'ASCII'),
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

1189
                mb_strlen(/** @scrutinizer ignore-deprecated */ $msg->body, 'ASCII'),

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
1190
                $msg->serialize_properties(),
1191 14
                $msg->body,
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

1191
                /** @scrutinizer ignore-deprecated */ $msg->body,

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
1192 3
                $pkt
1193 3
            );
1194 3
        } catch (AMQPConnectionClosedException $e) {
1195
            $this->do_close();
1196
            throw $e;
1197
        }
1198
1199
        if ($this->next_delivery_tag > 0) {
1200
            $this->published_messages[$this->next_delivery_tag] = $msg;
1201
            $msg->setDeliveryInfo($this->next_delivery_tag, false, $exchange, $routing_key);
1202
            $this->next_delivery_tag++;
1203
        }
1204
    }
1205
1206
    /**
1207
     * @param AMQPMessage $message
1208
     * @param string $exchange
1209
     * @param string $routing_key
1210
     * @param bool $mandatory
1211
     * @param bool $immediate
1212
     * @param int|null $ticket
1213
     */
1214
    public function batch_basic_publish(
1215
        $message,
1216
        $exchange = '',
1217
        $routing_key = '',
1218
        $mandatory = false,
1219
        $immediate = false,
1220
        $ticket = null
1221
    ) {
1222
        $this->batch_messages[] = [
1223
            $message,
1224
            $exchange,
1225
            $routing_key,
1226
            $mandatory,
1227
            $immediate,
1228
            $ticket
1229
        ];
1230
    }
1231
1232
    /**
1233
     * Publish batch
1234
     *
1235
     * @return void
1236
     * @throws AMQPChannelClosedException
1237
     * @throws AMQPConnectionClosedException
1238
     * @throws AMQPConnectionBlockedException
1239
     */
1240
    public function publish_batch()
1241
    {
1242
        if (empty($this->batch_messages)) {
1243
            return;
1244
        }
1245
1246
        $this->checkConnection();
1247
1248
        /** @var AMQPWriter $pkt */
1249
        $pkt = new AMQPWriter();
1250
1251
        foreach ($this->batch_messages as $m) {
1252
            /** @var AMQPMessage $msg */
1253
            $msg = $m[0];
1254
1255
            $exchange = isset($m[1]) ? $m[1] : '';
1256
            $routing_key = isset($m[2]) ? $m[2] : '';
1257
            $mandatory = isset($m[3]) ? $m[3] : false;
1258
            $immediate = isset($m[4]) ? $m[4] : false;
1259
            $ticket = isset($m[5]) ? $m[5] : null;
1260
            $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1261
1262
            $this->connection->prepare_content(
1263
                $this->channel_id,
1264
                60,
1265
                0,
1266
                mb_strlen($msg->body, 'ASCII'),
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

1266
                mb_strlen(/** @scrutinizer ignore-deprecated */ $msg->body, 'ASCII'),

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
1267
                $msg->serialize_properties(),
1268
                $msg->body,
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

1268
                /** @scrutinizer ignore-deprecated */ $msg->body,

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
1269
                $pkt
1270
            );
1271
1272
            if ($this->next_delivery_tag > 0) {
1273
                $this->published_messages[$this->next_delivery_tag] = $msg;
1274
                $this->next_delivery_tag++;
1275
            }
1276
        }
1277
1278
        $this->connection->write($pkt->getvalue());
1279
        $this->batch_messages = array();
1280
    }
1281
1282
    /**
1283
     * Specifies QoS
1284
     * 
1285
     * See https://www.rabbitmq.com/consumer-prefetch.html#overview for details
1286
     * 
1287
     * @param int $prefetch_size Default is 0 (Alias for unlimited)
1288
     * @param int $prefetch_count Default is 0 (Alias for unlimited)
1289
     * @param bool $global Default is false, prefetch size and count are applied to each channel consumer separately
1290
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1291
     * @return mixed
1292
     */
1293
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
1294
    {
1295
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1296
            $prefetch_size,
1297
            $prefetch_count,
1298
            $a_global
1299
        );
1300
1301
        $this->send_method_frame(array($class_id, $method_id), $args);
1302
1303
        return $this->wait(array(
1304
            $this->waitHelper->get_wait('basic.qos_ok')
1305
        ), false, $this->channel_rpc_timeout);
1306
    }
1307
1308
    /**
1309
     * Confirms QoS request
1310
     */
1311
    protected function basic_qos_ok()
1312
    {
1313
    }
1314
1315
    /**
1316
     * Redelivers unacknowledged messages
1317
     *
1318
     * @param bool $requeue
1319
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1320
     * @return mixed
1321
     */
1322
    public function basic_recover($requeue = false)
1323
    {
1324
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1325
        $this->send_method_frame(array($class_id, $method_id), $args);
1326
1327
        return $this->wait(array(
1328
            $this->waitHelper->get_wait('basic.recover_ok')
1329
        ), false, $this->channel_rpc_timeout);
1330
    }
1331
1332
    /**
1333
     * Confirm the requested recover
1334
     */
1335
    protected function basic_recover_ok()
1336
    {
1337
    }
1338
1339
    /**
1340
     * Rejects an incoming message
1341
     *
1342
     * @param int $delivery_tag
1343
     * @param bool $requeue
1344
     */
1345
    public function basic_reject($delivery_tag, $requeue)
1346
    {
1347
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1348
        $this->send_method_frame(array($class_id, $method_id), $args);
1349
    }
1350
1351
    /**
1352
     * Returns a failed message
1353
     *
1354
     * @param AMQPReader $reader
1355
     * @param AMQPMessage $message
1356
     */
1357
    protected function basic_return(AMQPReader $reader, AMQPMessage $message)
1358
    {
1359
        $callback = $this->basic_return_callback;
1360
        if (!is_callable($callback)) {
1361
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1362
            return null;
1363
        }
1364
1365
        $reply_code = $reader->read_short();
1366
        $reply_text = $reader->read_shortstr();
1367
        $exchange = $reader->read_shortstr();
1368
        $routing_key = $reader->read_shortstr();
1369
1370
        call_user_func_array($callback, array(
0 ignored issues
show
Bug introduced by
It seems like $callback can also be of type null; however, parameter $callback of call_user_func_array() does only seem to accept callable, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1370
        call_user_func_array(/** @scrutinizer ignore-type */ $callback, array(
Loading history...
1371
            $reply_code,
1372
            $reply_text,
1373
            $exchange,
1374
            $routing_key,
1375
            $message,
1376
        ));
1377
    }
1378
1379
    /**
1380
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1381
     * @return mixed
1382
     */
1383
    public function tx_commit()
1384
    {
1385
        $this->send_method_frame(array(90, 20));
1386
1387
        return $this->wait(array(
1388
            $this->waitHelper->get_wait('tx.commit_ok')
1389
        ), false, $this->channel_rpc_timeout);
1390
    }
1391
1392
    /**
1393
     * Confirms a successful commit
1394
     */
1395
    protected function tx_commit_ok()
1396
    {
1397
    }
1398
1399
    /**
1400
     * Rollbacks the current transaction
1401
     *
1402
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1403
     * @return mixed
1404
     */
1405
    public function tx_rollback()
1406
    {
1407
        $this->send_method_frame(array(90, 30));
1408
1409
        return $this->wait(array(
1410
            $this->waitHelper->get_wait('tx.rollback_ok')
1411
        ), false, $this->channel_rpc_timeout);
1412
    }
1413
1414
    /**
1415
     * Confirms a successful rollback
1416
     */
1417 4
    protected function tx_rollback_ok()
1418
    {
1419 4
    }
1420
1421 4
    /**
1422
     * Puts the channel into confirm mode
1423 4
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1424
     *
1425
     * @param bool $nowait
1426
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1427 4
     */
1428 4
    public function confirm_select($nowait = false)
1429 4
    {
1430 3
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
0 ignored issues
show
Bug introduced by
The method confirmSelect() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1430
        /** @scrutinizer ignore-call */ 
1431
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
1431
1432
        $this->send_method_frame(array($class_id, $method_id), $args);
1433
1434
        if ($nowait) {
1435
            return null;
1436
        }
1437
1438
        $this->wait(array(
1439
            $this->waitHelper->get_wait('confirm.select_ok')
1440
        ), false, $this->channel_rpc_timeout);
1441
        $this->next_delivery_tag = 1;
1442
    }
1443
1444
    /**
1445
     * Confirms a selection
1446
     */
1447
    public function confirm_select_ok()
1448 2
    {
1449
    }
1450
1451 2
    /**
1452 2
     * Waits for pending acks and nacks from the server.
1453
     * If there are no pending acks, the method returns immediately
1454 2
     *
1455 2
     * @param int|float $timeout Waits until $timeout value is reached
1456 2
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1457
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1458
     */
1459
    public function wait_for_pending_acks($timeout = 0)
1460
    {
1461
        $functions = array(
1462
            $this->waitHelper->get_wait('basic.ack'),
1463
            $this->waitHelper->get_wait('basic.nack'),
1464
        );
1465
        $timeout = max(0, $timeout);
1466
        while (!empty($this->published_messages)) {
1467
            $this->wait($functions, false, $timeout);
1468 1
        }
1469
    }
1470
1471 1
    /**
1472 1
     * Waits for pending acks, nacks and returns from the server.
1473 1
     * If there are no pending acks, the method returns immediately.
1474
     *
1475
     * @param int|float $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1476 1
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1477 1
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1478 1
     */
1479
    public function wait_for_pending_acks_returns($timeout = 0)
1480
    {
1481
        $functions = array(
1482
            $this->waitHelper->get_wait('basic.ack'),
1483
            $this->waitHelper->get_wait('basic.nack'),
1484
            $this->waitHelper->get_wait('basic.return'),
1485
        );
1486
1487
        $timeout = max(0, $timeout);
1488
        while (!empty($this->published_messages)) {
1489
            $this->wait($functions, false, $timeout);
1490
        }
1491
    }
1492
1493
    /**
1494
     * Selects standard transaction mode
1495
     *
1496
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1497
     * @return mixed
1498
     */
1499
    public function tx_select()
1500
    {
1501
        $this->send_method_frame(array(90, 10));
1502
1503
        return $this->wait(array(
1504
            $this->waitHelper->get_wait('tx.select_ok')
1505
        ), false, $this->channel_rpc_timeout);
1506
    }
1507
1508 32
    /**
1509
     * Confirms transaction mode
1510 32
     */
1511
    protected function tx_select_ok()
1512
    {
1513
    }
1514
1515
    /**
1516
     * @param int|null $ticket
1517
     * @return int
1518
     */
1519 3
    protected function getTicket($ticket)
1520
    {
1521 3
        return (null === $ticket) ? $this->default_ticket : $ticket;
1522 3
    }
1523
1524 3
    /**
1525
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1526
     *
1527
     * @param int $index
1528
     * @return AMQPMessage
1529
     */
1530
    protected function get_and_unset_message($index)
1531
    {
1532
        $message = $this->published_messages[$index];
1533
        unset($this->published_messages[$index]);
1534
1535
        return $message;
1536
    }
1537
1538
    /**
1539
     * Sets callback for basic_return
1540
     *
1541
     * @param  callable $callback
1542
     * @throws \InvalidArgumentException if $callback is not callable
1543
     */
1544
    public function set_return_listener($callback)
1545
    {
1546
        Assert::isCallable($callback);
1547
        $this->basic_return_callback = $callback;
1548
    }
1549
1550
    /**
1551
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1552
     *
1553
     * @param callable $callback
1554
     * @throws \InvalidArgumentException
1555
     */
1556
    public function set_nack_handler($callback)
1557 3
    {
1558
        Assert::isCallable($callback);
1559 3
        $this->nack_handler = $callback;
1560 3
    }
1561
1562
    /**
1563
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1564
     *
1565
     * @param callable $callback
1566
     * @throws \InvalidArgumentException
1567
     */
1568 15
    public function set_ack_handler($callback)
1569
    {
1570 15
        Assert::isCallable($callback);
1571
        $this->ack_handler = $callback;
1572
    }
1573 15
1574 1
    /**
1575
     * @throws AMQPChannelClosedException
1576
     * @throws AMQPConnectionBlockedException
1577
     */
1578
    private function checkConnection()
1579
    {
1580
        if ($this->connection === null || !$this->connection->isConnected()) {
1581
            throw new AMQPChannelClosedException('Channel connection is closed.');
1582
        }
1583
        if ($this->connection->isBlocked()) {
1584
            throw new AMQPConnectionBlockedException();
1585
        }
1586
    }
1587
1588
    /**
1589
     * Wait and process all incoming messages in an endless loop,
1590
     * until connection exception or manual stop using self::stopConsume()
1591
     *
1592
     * @param float $maximumPoll Maximum time in seconds between read attempts
1593
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
1594
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1595
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
1596
     * @throws \ErrorException
1597
     * @since 3.2.0
1598
     */
1599
    public function consume(float $maximumPoll = 10.0): void
1600
    {
1601
        $this->checkConnection();
1602
1603
        if ($this->stopConsume) {
1604
            $this->stopConsume = false;
1605
            return;
1606
        }
1607
1608
        $timeout = $this->connection->getReadTimeout();
1609
        $heartBeat = $this->connection->getHeartbeat();
1610
        if ($heartBeat > 2) {
1611
            $timeout = min($timeout, floor($heartBeat / 2));
1612
        }
1613
        $timeout = max(min($timeout, $maximumPoll), 1);
1614
        while ($this->is_consuming() || !empty($this->method_queue)) {
1615
            if ($this->stopConsume) {
1616
                $this->stopConsume = false;
1617
                return;
1618
            }
1619
            try {
1620
                $this->wait(null, false, $timeout);
1621
            } catch (AMQPTimeoutException $exception) {
1622
                // something might be wrong, try to send heartbeat which involves select+write
1623
                $this->connection->checkHeartBeat();
1624
                continue;
1625
            } catch (AMQPNoDataException $exception) {
1626
                continue;
1627
            }
1628
        }
1629
    }
1630
1631
    /**
1632
     * Stop AMQPChannel::consume() loop. Useful for signal handlers and other interrupts.
1633
     * @since 3.2.0
1634
     */
1635
    public function stopConsume()
1636
    {
1637
        $this->stopConsume = true;
1638
    }
1639
}
1640