Completed
Push — master ( e1ca4d...b04e9f )
by Ramūnas
01:58
created

PhpAmqpLib/Channel/AMQPChannel.php (19 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace PhpAmqpLib\Channel;
4
5
use PhpAmqpLib\Connection\AbstractConnection;
6
use PhpAmqpLib\Exception\AMQPBasicCancelException;
7
use PhpAmqpLib\Exception\AMQPChannelClosedException;
8
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
9
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
10
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
11
use PhpAmqpLib\Exception\AMQPRuntimeException;
12
use PhpAmqpLib\Helper\Assert;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use PhpAmqpLib\Wire;
15
use PhpAmqpLib\Wire\AMQPReader;
16
use PhpAmqpLib\Wire\AMQPTable;
17
use PhpAmqpLib\Wire\AMQPWriter;
18
19
class AMQPChannel extends AbstractChannel
20
{
21
    /**
22
     * @var callable[]
23
     * @internal Use is_consuming() to check if there is active callbacks
24
     */
25
    public $callbacks = array();
26
27
    /** @var bool Whether or not the channel has been "opened" */
28
    protected $is_open = false;
29
30
    /** @var int */
31
    protected $default_ticket = 0;
32
33
    /** @var bool */
34
    protected $active = true;
35
36
    /** @var array */
37
    protected $alerts = array();
38
39
    /** @var bool */
40
    protected $auto_decode;
41
42
    /**
43
     * These parameters will be passed to function in case of basic_return:
44
     *    param int $reply_code
45
     *    param string $reply_text
46
     *    param string $exchange
47
     *    param string $routing_key
48
     *    param AMQPMessage $msg
49
     *
50
     * @var null|callable
51
     */
52
    protected $basic_return_callback;
53
54
    /** @var array Used to keep track of the messages that are going to be batch published. */
55
    protected $batch_messages = array();
56
57
    /**
58
     * If the channel is in confirm_publish mode this array will store all published messages
59
     * until they get ack'ed or nack'ed
60
     *
61
     * @var AMQPMessage[]
62
     */
63
    private $published_messages = array();
64
65
    /** @var int */
66
    private $next_delivery_tag = 0;
67
68
    /** @var null|callable */
69
    private $ack_handler;
70
71
    /** @var null|callable */
72
    private $nack_handler;
73
74
    /**
75
     * Circular buffer to speed up both basic_publish() and publish_batch().
76
     * Max size limited by $publish_cache_max_size.
77
     *
78
     * @var array
79
     * @see basic_publish()
80
     * @see publish_batch()
81
     */
82
    private $publish_cache = array();
83
84
    /**
85
     * Maximal size of $publish_cache
86
     *
87
     * @var int
88
     */
89
    private $publish_cache_max_size = 100;
90
91
    /**
92
     * Maximum time to wait for operations on this channel, in seconds.
93
     * @var float
94
     */
95
    protected $channel_rpc_timeout;
96
97
    /**
98
     * @param AbstractConnection $connection
99
     * @param int|null $channel_id
100
     * @param bool $auto_decode
101
     * @param int|float $channel_rpc_timeout
102
     * @throws \Exception
103
     */
104 38
    public function __construct($connection, $channel_id = null, $auto_decode = true, $channel_rpc_timeout = 0)
105
    {
106 38
        if ($channel_id == null) {
0 ignored issues
show
Bug Best Practice introduced by
It seems like you are loosely comparing $channel_id of type integer|null against null; this is ambiguous if the integer can be zero. Consider using a strict comparison === instead.
Loading history...
107
            $channel_id = $connection->get_free_channel_id();
108
        }
109
110 38
        parent::__construct($connection, $channel_id);
111
112 38
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
113
114 38
        $this->auto_decode = $auto_decode;
115 38
        $this->channel_rpc_timeout = $channel_rpc_timeout;
0 ignored issues
show
Documentation Bug introduced by
The property $channel_rpc_timeout was declared of type double, but $channel_rpc_timeout is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
116
117
        try {
118 38
            $this->x_open();
119
        } catch (\Exception $e) {
120
            $this->close();
121
            throw $e;
122
        }
123 38
    }
124
125
    /**
126
     * @return bool
127
     */
128 4
    public function is_open()
129
    {
130 4
        return $this->is_open;
131
    }
132
133
    /**
134
     * Tear down this object, after we've agreed to close with the server.
135
     */
136 33
    protected function do_close()
137
    {
138 33
        if ($this->channel_id !== null) {
139 33
            unset($this->connection->channels[$this->channel_id]);
140
        }
141 33
        $this->channel_id = $this->connection = null;
142 33
        $this->is_open = false;
143 33
        $this->callbacks = array();
144 33
    }
145
146
    /**
147
     * Only for AMQP0.8.0
148
     * This method allows the server to send a non-fatal warning to
149
     * the client.  This is used for methods that are normally
150
     * asynchronous and thus do not have confirmations, and for which
151
     * the server may detect errors that need to be reported.  Fatal
152
     * errors are handled as channel or connection exceptions; non-
153
     * fatal errors are sent through this method.
154
     *
155
     * @param AMQPReader $reader
156
     */
157
    protected function channel_alert($reader)
158
    {
159
        $reply_code = $reader->read_short();
160
        $reply_text = $reader->read_shortstr();
161
        $details = $reader->read_table();
162
        array_push($this->alerts, array($reply_code, $reply_text, $details));
163
    }
164
165
    /**
166
     * Request a channel close
167
     *
168
     * @param int $reply_code
169
     * @param string $reply_text
170
     * @param array $method_sig
171
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
172
     * @return mixed
173
     */
174 33
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
175
    {
176 33
        $this->callbacks = array();
177 33
        if ($this->is_open === false || $this->connection === null) {
178 7
            $this->do_close();
179
180 7
            return null; // already closed
181
        }
182 33
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
183 33
            $reply_code,
184
            $reply_text,
185 33
            $method_sig[0],
186 33
            $method_sig[1]
187
        );
188
189
        try {
190 33
            $this->send_method_frame(array($class_id, $method_id), $args);
191
        } catch (\Exception $e) {
192
            $this->do_close();
193
194
            throw $e;
195
        }
196
197 33
        return $this->wait(array(
198 33
            $this->waitHelper->get_wait('channel.close_ok')
199 33
        ), false, $this->channel_rpc_timeout);
200
    }
201
202
    /**
203
     * @param AMQPReader $reader
204
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
205
     */
206 1
    protected function channel_close($reader)
207
    {
208 1
        $reply_code = $reader->read_short();
209 1
        $reply_text = $reader->read_shortstr();
210 1
        $class_id = $reader->read_short();
211 1
        $method_id = $reader->read_short();
212
213 1
        $this->send_method_frame(array(20, 41));
214 1
        $this->do_close();
215
216 1
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
217
    }
218
219
    /**
220
     * Confirm a channel close
221
     * Alias of AMQPChannel::do_close()
222
     */
223 33
    protected function channel_close_ok()
224
    {
225 33
        $this->do_close();
226 33
    }
227
228
    /**
229
     * Enables/disables flow from peer
230
     *
231
     * @param bool $active
232
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
233
     * @return mixed
234
     */
235
    public function flow($active)
236
    {
237
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
238
        $this->send_method_frame(array($class_id, $method_id), $args);
239
240
        return $this->wait(array(
241
            $this->waitHelper->get_wait('channel.flow_ok')
242
        ), false, $this->channel_rpc_timeout);
243
    }
244
245
    /**
246
     * @param AMQPReader $reader
247
     */
248
    protected function channel_flow($reader)
249
    {
250
        $this->active = $reader->read_bit();
251
        $this->x_flow_ok($this->active);
252
    }
253
254
    /**
255
     * @param bool $active
256
     */
257
    protected function x_flow_ok($active)
258
    {
259
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
260
        $this->send_method_frame(array($class_id, $method_id), $args);
261
    }
262
263
    /**
264
     * @param AMQPReader $reader
265
     * @return bool
266
     */
267
    protected function channel_flow_ok($reader)
268
    {
269
        return $reader->read_bit();
270
    }
271
272
    /**
273
     * @param string $out_of_band
274
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
275
     * @return mixed
276
     */
277 38
    protected function x_open($out_of_band = '')
278
    {
279 38
        if ($this->is_open) {
280
            return null;
281
        }
282
283 38
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
284 38
        $this->send_method_frame(array($class_id, $method_id), $args);
285
286 38
        return $this->wait(array(
287 38
            $this->waitHelper->get_wait('channel.open_ok')
288 38
        ), false, $this->channel_rpc_timeout);
289
    }
290
291 38
    protected function channel_open_ok()
292
    {
293 38
        $this->is_open = true;
294
295 38
        $this->debug->debug_msg('Channel open');
296 38
    }
297
298
    /**
299
     * Requests an access ticket
300
     *
301
     * @param string $realm
302
     * @param bool $exclusive
303
     * @param bool $passive
304
     * @param bool $active
305
     * @param bool $write
306
     * @param bool $read
307
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
308
     * @return mixed
309
     */
310
    public function access_request(
311
        $realm,
312
        $exclusive = false,
313
        $passive = false,
314
        $active = false,
315
        $write = false,
316
        $read = false
317
    ) {
318
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
319
            $realm,
320
            $exclusive,
321
            $passive,
322
            $active,
323
            $write,
324
            $read
325
        );
326
327
        $this->send_method_frame(array($class_id, $method_id), $args);
328
329
        return $this->wait(array(
330
            $this->waitHelper->get_wait('access.request_ok')
331
        ), false, $this->channel_rpc_timeout);
332
    }
333
334
    /**
335
     * Grants access to server resources
336
     *
337
     * @param AMQPReader $reader
338
     * @return string
339
     */
340
    protected function access_request_ok($reader)
341
    {
342
        $this->default_ticket = $reader->read_short();
343
344
        return $this->default_ticket;
345
    }
346
347
    /**
348
     * Declares exchange
349
     *
350
     * @param string $exchange
351
     * @param string $type
352
     * @param bool $passive
353
     * @param bool $durable
354
     * @param bool $auto_delete
355
     * @param bool $internal
356
     * @param bool $nowait
357
     * @param AMQPTable|array $arguments
358
     * @param int|null $ticket
359
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
360
     * @return mixed|null
361
     */
362 31 View Code Duplication
    public function exchange_declare(
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
363
        $exchange,
364
        $type,
365
        $passive = false,
366
        $durable = false,
367
        $auto_delete = true,
368
        $internal = false,
369
        $nowait = false,
370
        $arguments = array(),
371
        $ticket = null
372
    ) {
373 31
        $ticket = $this->getTicket($ticket);
374
375 31
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
376 31
            $ticket,
377
            $exchange,
378
            $type,
379
            $passive,
380
            $durable,
381
            $auto_delete,
382
            $internal,
383
            $nowait,
384
            $arguments
385
        );
386
387 31
        $this->send_method_frame(array($class_id, $method_id), $args);
388
389 27
        if ($nowait) {
390
            return null;
391
        }
392
393 27
        return $this->wait(array(
394 27
            $this->waitHelper->get_wait('exchange.declare_ok')
395 27
        ), false, $this->channel_rpc_timeout);
396
    }
397
398
    /**
399
     * Confirms an exchange declaration
400
     */
401 26
    protected function exchange_declare_ok()
402
    {
403 26
    }
404
405
    /**
406
     * Deletes an exchange
407
     *
408
     * @param string $exchange
409
     * @param bool $if_unused
410
     * @param bool $nowait
411
     * @param int|null $ticket
412
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
413
     * @return mixed|null
414
     */
415 11 View Code Duplication
    public function exchange_delete(
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
416
        $exchange,
417
        $if_unused = false,
418
        $nowait = false,
419
        $ticket = null
420
    ) {
421 11
        $ticket = $this->getTicket($ticket);
422 11
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
423 11
            $ticket,
424
            $exchange,
425
            $if_unused,
426
            $nowait
427
        );
428
429 11
        $this->send_method_frame(array($class_id, $method_id), $args);
430
431 11
        if ($nowait) {
432
            return null;
433
        }
434
435 11
        return $this->wait(array(
436 11
            $this->waitHelper->get_wait('exchange.delete_ok')
437 11
        ), false, $this->channel_rpc_timeout);
438
    }
439
440
    /**
441
     * Confirms deletion of an exchange
442
     */
443 11
    protected function exchange_delete_ok()
444
    {
445 11
    }
446
447
    /**
448
     * Binds dest exchange to source exchange
449
     *
450
     * @param string $destination
451
     * @param string $source
452
     * @param string $routing_key
453
     * @param bool $nowait
454
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
455
     * @param int|null $ticket
456
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
457
     * @return mixed|null
458
     */
459
    public function exchange_bind(
460
        $destination,
461
        $source,
462
        $routing_key = '',
463
        $nowait = false,
464
        $arguments = array(),
465
        $ticket = null
466
    ) {
467
        $ticket = $this->getTicket($ticket);
468
469
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
470
            $ticket,
471
            $destination,
472
            $source,
473
            $routing_key,
474
            $nowait,
475
            $arguments
476
        );
477
478
        $this->send_method_frame(array($class_id, $method_id), $args);
479
480
        if ($nowait) {
481
            return null;
482
        }
483
484
        return $this->wait(array(
485
            $this->waitHelper->get_wait('exchange.bind_ok')
486
        ), false, $this->channel_rpc_timeout);
487
    }
488
489
    /**
490
     * Confirms bind successful
491
     */
492
    protected function exchange_bind_ok()
493
    {
494
    }
495
496
    /**
497
     * Unbinds dest exchange from source exchange
498
     *
499
     * @param string $destination
500
     * @param string $source
501
     * @param string $routing_key
502
     * @param bool $nowait
503
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
504
     * @param int|null $ticket
505
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
506
     * @return mixed
507
     */
508 View Code Duplication
    public function exchange_unbind(
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
509
        $destination,
510
        $source,
511
        $routing_key = '',
512
        $nowait = false,
513
        $arguments = array(),
514
        $ticket = null
515
    ) {
516
        $ticket = $this->getTicket($ticket);
517
518
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
519
            $ticket,
520
            $destination,
521
            $source,
522
            $routing_key,
523
            $nowait,
524
            $arguments
525
        );
526
527
        $this->send_method_frame(array($class_id, $method_id), $args);
528
529
        return $this->wait(array(
530
            $this->waitHelper->get_wait('exchange.unbind_ok')
531
        ), false, $this->channel_rpc_timeout);
532
    }
533
534
    /**
535
     * Confirms unbind successful
536
     */
537
    protected function exchange_unbind_ok()
538
    {
539
    }
540
541
    /**
542
     * Binds queue to an exchange
543
     *
544
     * @param string $queue
545
     * @param string $exchange
546
     * @param string $routing_key
547
     * @param bool $nowait
548
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
549
     * @param int|null $ticket
550
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
551
     * @return mixed|null
552
     */
553 27
    public function queue_bind(
554
        $queue,
555
        $exchange,
556
        $routing_key = '',
557
        $nowait = false,
558
        $arguments = array(),
559
        $ticket = null
560
    ) {
561 27
        $ticket = $this->getTicket($ticket);
562
563 27
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
564 27
            $ticket,
565
            $queue,
566
            $exchange,
567
            $routing_key,
568
            $nowait,
569
            $arguments
570
        );
571
572 27
        $this->send_method_frame(array($class_id, $method_id), $args);
573
574 27
        if ($nowait) {
575
            return null;
576
        }
577
578 27
        return $this->wait(array(
579 27
            $this->waitHelper->get_wait('queue.bind_ok')
580 27
        ), false, $this->channel_rpc_timeout);
581
    }
582
583
    /**
584
     * Confirms bind successful
585
     */
586 26
    protected function queue_bind_ok()
587
    {
588 26
    }
589
590
    /**
591
     * Unbind queue from an exchange
592
     *
593
     * @param string $queue
594
     * @param string $exchange
595
     * @param string $routing_key
596
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
597
     * @param int|null $ticket
598
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
599
     * @return mixed
600
     */
601
    public function queue_unbind(
602
        $queue,
603
        $exchange,
604
        $routing_key = '',
605
        $arguments = array(),
606
        $ticket = null
607
    ) {
608
        $ticket = $this->getTicket($ticket);
609
610
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
611
            $ticket,
612
            $queue,
613
            $exchange,
614
            $routing_key,
615
            $arguments
616
        );
617
618
        $this->send_method_frame(array($class_id, $method_id), $args);
619
620
        return $this->wait(array(
621
            $this->waitHelper->get_wait('queue.unbind_ok')
622
        ), false, $this->channel_rpc_timeout);
623
    }
624
625
    /**
626
     * Confirms unbind successful
627
     */
628
    protected function queue_unbind_ok()
629
    {
630
    }
631
632
    /**
633
     * Declares queue, creates if needed
634
     *
635
     * @param string $queue
636
     * @param bool $passive
637
     * @param bool $durable
638
     * @param bool $exclusive
639
     * @param bool $auto_delete
640
     * @param bool $nowait
641
     * @param array|AMQPTable $arguments
642
     * @param int|null $ticket
643
     * @return array|null
644
     *@throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
645
     */
646 31 View Code Duplication
    public function queue_declare(
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
647
        $queue = '',
648
        $passive = false,
649
        $durable = false,
650
        $exclusive = false,
651
        $auto_delete = true,
652
        $nowait = false,
653
        $arguments = array(),
654
        $ticket = null
655
    ) {
656 31
        $ticket = $this->getTicket($ticket);
657
658 31
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
659 31
            $ticket,
660
            $queue,
661
            $passive,
662
            $durable,
663
            $exclusive,
664
            $auto_delete,
665
            $nowait,
666
            $arguments
667
        );
668
669 31
        $this->send_method_frame(array($class_id, $method_id), $args);
670
671 31
        if ($nowait) {
672
            return null;
673
        }
674
675 31
        return $this->wait(array(
676 31
            $this->waitHelper->get_wait('queue.declare_ok')
677 31
        ), false, $this->channel_rpc_timeout);
678
    }
679
680
    /**
681
     * Confirms a queue definition
682
     *
683
     * @param AMQPReader $reader
684
     * @return string[]
685
     */
686 30
    protected function queue_declare_ok($reader)
687
    {
688 30
        $queue = $reader->read_shortstr();
689 30
        $message_count = $reader->read_long();
690 30
        $consumer_count = $reader->read_long();
691
692 30
        return array($queue, $message_count, $consumer_count);
693
    }
694
695
    /**
696
     * Deletes a queue
697
     *
698
     * @param string $queue
699
     * @param bool $if_unused
700
     * @param bool $if_empty
701
     * @param bool $nowait
702
     * @param int|null $ticket
703
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
704
     * @return mixed|null
705
     */
706 7
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
707
    {
708 7
        $ticket = $this->getTicket($ticket);
709
710 7
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
711 7
            $ticket,
712
            $queue,
713
            $if_unused,
714
            $if_empty,
715
            $nowait
716
        );
717
718 7
        $this->send_method_frame(array($class_id, $method_id), $args);
719
720 7
        if ($nowait) {
721 1
            return null;
722
        }
723
724 6
        return $this->wait(array(
725 6
            $this->waitHelper->get_wait('queue.delete_ok')
726 6
        ), false, $this->channel_rpc_timeout);
727
    }
728
729
    /**
730
     * Confirms deletion of a queue
731
     *
732
     * @param AMQPReader $reader
733
     * @return string
734
     */
735 6
    protected function queue_delete_ok($reader)
736
    {
737 6
        return $reader->read_long();
738
    }
739
740
    /**
741
     * Purges a queue
742
     *
743
     * @param string $queue
744
     * @param bool $nowait
745
     * @param int|null $ticket
746
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
747
     * @return mixed|null
748
     */
749 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
750
    {
751
        $ticket = $this->getTicket($ticket);
752
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
753
754
        $this->send_method_frame(array($class_id, $method_id), $args);
755
756
        if ($nowait) {
757
            return null;
758
        }
759
760
        return $this->wait(array(
761
            $this->waitHelper->get_wait('queue.purge_ok')
762
        ), false, $this->channel_rpc_timeout);
763
    }
764
765
    /**
766
     * Confirms a queue purge
767
     *
768
     * @param AMQPReader $reader
769
     * @return string
770
     */
771
    protected function queue_purge_ok($reader)
772
    {
773
        return $reader->read_long();
774
    }
775
776
    /**
777
     * Acknowledges one or more messages
778
     *
779
     * @param int $delivery_tag
780
     * @param bool $multiple
781
     */
782 2
    public function basic_ack($delivery_tag, $multiple = false)
783
    {
784 2
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
785 2
        $this->send_method_frame(array($class_id, $method_id), $args);
786 2
    }
787
788
    /**
789
     * Called when the server sends a basic.ack
790
     *
791
     * @param AMQPReader $reader
792
     * @throws AMQPRuntimeException
793
     */
794 3 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $reader)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
795
    {
796 3
        $delivery_tag = $reader->read_longlong();
797 3
        $multiple = (bool) $reader->read_bit();
798
799 3
        if (!isset($this->published_messages[$delivery_tag])) {
800
            throw new AMQPRuntimeException(sprintf(
801
                'Server ack\'ed unknown delivery_tag "%s"',
802
                $delivery_tag
803
            ));
804
        }
805
806 3
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
0 ignored issues
show
It seems like $this->ack_handler can also be of type null; however, PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept callable, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
807 3
    }
808
809
    /**
810
     * Called when the server sends a basic.nack
811
     *
812
     * @param AMQPReader $reader
813
     * @throws AMQPRuntimeException
814
     */
815 View Code Duplication
    protected function basic_nack_from_server($reader)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
816
    {
817
        $delivery_tag = $reader->read_longlong();
818
        $multiple = (bool) $reader->read_bit();
819
820
        if (!isset($this->published_messages[$delivery_tag])) {
821
            throw new AMQPRuntimeException(sprintf(
822
                'Server nack\'ed unknown delivery_tag "%s"',
823
                $delivery_tag
824
            ));
825
        }
826
827
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
0 ignored issues
show
It seems like $this->nack_handler can also be of type null; however, PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept callable, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
828
    }
829
830
    /**
831
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
832
     *
833
     * @param int $delivery_tag
834
     * @param bool $multiple
835
     * @param callable $handler
836
     */
837 3
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
838
    {
839 3
        if ($multiple) {
840
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
841
842
            foreach ($keys as $key) {
843
                $this->internal_ack_handler($key, false, $handler);
844
            }
845
        } else {
846 3
            $message = $this->get_and_unset_message($delivery_tag);
847 3
            $this->dispatch_to_handler($handler, array($message));
848
        }
849 3
    }
850
851
    /**
852
     * @param AMQPMessage[] $messages
853
     * @param string $value
854
     * @return mixed
855
     */
856
    protected function get_keys_less_or_equal(array $messages, $value)
857
    {
858
        $value = (int) $value;
859
        $keys = array_reduce(
860
            array_keys($messages),
861
            /**
862
             * @param string $key
863
             */
864
            function ($keys, $key) use ($value) {
865
                if ($key <= $value) {
866
                    $keys[] = $key;
867
                }
868
869
                return $keys;
870
            },
871
            array()
872
        );
873
874
        return $keys;
875
    }
876
877
    /**
878
     * Rejects one or several received messages
879
     *
880
     * @param int $delivery_tag
881
     * @param bool $multiple
882
     * @param bool $requeue
883
     */
884
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
885
    {
886
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
887
        $this->send_method_frame(array($class_id, $method_id), $args);
888
    }
889
890
    /**
891
     * Ends a queue consumer
892
     *
893
     * @param string $consumer_tag
894
     * @param bool $nowait
895
     * @param bool $noreturn
896
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
897
     * @return mixed
898
     */
899 3 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
900
    {
901 3
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
902 3
        $this->send_method_frame(array($class_id, $method_id), $args);
903
904 3
        if ($nowait || $noreturn) {
905
            unset($this->callbacks[$consumer_tag]);
906
            return $consumer_tag;
907
        }
908
909 3
        return $this->wait(array(
910 3
            $this->waitHelper->get_wait('basic.cancel_ok')
911 3
        ), false, $this->channel_rpc_timeout);
912
    }
913
914
    /**
915
     * @param AMQPReader $reader
916
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
917
     */
918
    protected function basic_cancel_from_server(AMQPReader $reader)
919
    {
920
        throw new AMQPBasicCancelException($reader->read_shortstr());
921
    }
922
923
    /**
924
     * Confirm a cancelled consumer
925
     *
926
     * @param AMQPReader $reader
927
     * @return string
928
     */
929 3
    protected function basic_cancel_ok($reader)
930
    {
931 3
        $consumerTag = $reader->read_shortstr();
932 3
        unset($this->callbacks[$consumerTag]);
933
934 3
        return $consumerTag;
935
    }
936
937
    /**
938
     * @return bool
939
     */
940 1
    public function is_consuming()
941
    {
942 1
        return !empty($this->callbacks);
943
    }
944
945
    /**
946
     * Start a queue consumer.
947
     * This method asks the server to start a "consumer", which is a transient request for messages
948
     * from a specific queue.
949
     * Consumers last as long as the channel they were declared on, or until the client cancels them.
950
     *
951
     * @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
952
     *
953
     * @param string $queue
954
     * @param string $consumer_tag
955
     * @param bool $no_local
956
     * @param bool $no_ack
957
     * @param bool $exclusive
958
     * @param bool $nowait
959
     * @param callable|null $callback
960
     * @param int|null $ticket
961
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
962
     *
963
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
964
     * @throws \InvalidArgumentException
965
     * @return string
966
     */
967 7
    public function basic_consume(
968
        $queue = '',
969
        $consumer_tag = '',
970
        $no_local = false,
971
        $no_ack = false,
972
        $exclusive = false,
973
        $nowait = false,
974
        $callback = null,
975
        $ticket = null,
976
        $arguments = array()
977
    ) {
978 7
        if (null !== $callback) {
979 6
            Assert::isCallable($callback);
980
        }
981 6
        if ($nowait && empty($consumer_tag)) {
982 1
            throw new \InvalidArgumentException('Cannot start consumer without consumer_tag and no-wait=true');
983
        }
984 5
        if (!empty($consumer_tag) && array_key_exists($consumer_tag, $this->callbacks)) {
985 1
            throw new \InvalidArgumentException('This consumer tag is already registered.');
986
        }
987
988 5
        $ticket = $this->getTicket($ticket);
989 5
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
990 5
            $ticket,
991
            $queue,
992
            $consumer_tag,
993
            $no_local,
994
            $no_ack,
995
            $exclusive,
996
            $nowait,
997 5
            $this->protocolVersion === Wire\Constants091::VERSION ? $arguments : null
998
        );
999
1000 5
        $this->send_method_frame(array($class_id, $method_id), $args);
1001
1002 5
        if (false === $nowait) {
1003 5
            $consumer_tag = $this->wait(array(
1004 5
                $this->waitHelper->get_wait('basic.consume_ok')
1005 5
            ), false, $this->channel_rpc_timeout);
1006
        }
1007
1008 5
        $this->callbacks[$consumer_tag] = $callback;
1009
1010 5
        return $consumer_tag;
1011
    }
1012
1013
    /**
1014
     * Confirms a new consumer
1015
     *
1016
     * @param AMQPReader $reader
1017
     * @return string
1018
     */
1019 5
    protected function basic_consume_ok($reader)
1020
    {
1021 5
        return $reader->read_shortstr();
1022
    }
1023
1024
    /**
1025
     * Notifies the client of a consumer message
1026
     *
1027
     * @param AMQPReader $reader
1028
     * @param AMQPMessage $message
1029
     */
1030 4
    protected function basic_deliver($reader, $message)
1031
    {
1032 4
        $consumer_tag = $reader->read_shortstr();
1033 4
        $delivery_tag = $reader->read_longlong();
1034 4
        $redelivered = $reader->read_bit();
1035 4
        $exchange = $reader->read_shortstr();
1036 4
        $routing_key = $reader->read_shortstr();
1037
1038
        $message
1039 4
            ->setChannel($this)
1040 4
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
1041 4
            ->setConsumerTag($consumer_tag);
1042
1043 4
        if (isset($this->callbacks[$consumer_tag])) {
1044 4
            call_user_func($this->callbacks[$consumer_tag], $message);
1045
        }
1046 3
    }
1047
1048
    /**
1049
     * Direct access to a queue if no message was available in the queue, return null
1050
     *
1051
     * @param string $queue
1052
     * @param bool $no_ack
1053
     * @param int|null $ticket
1054
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1055
     * @return AMQPMessage|null
1056
     */
1057 8
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
1058
    {
1059 8
        $ticket = $this->getTicket($ticket);
1060 8
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1061
1062 8
        $this->send_method_frame(array($class_id, $method_id), $args);
1063
1064 8
        return $this->wait(array(
1065 8
            $this->waitHelper->get_wait('basic.get_ok'),
1066 8
            $this->waitHelper->get_wait('basic.get_empty')
1067 8
        ), false, $this->channel_rpc_timeout);
1068
    }
1069
1070
    /**
1071
     * Indicates no messages available
1072
     */
1073
    protected function basic_get_empty()
1074
    {
1075
    }
1076
1077
    /**
1078
     * Provides client with a message
1079
     *
1080
     * @param AMQPReader $reader
1081
     * @param AMQPMessage $message
1082
     * @return AMQPMessage
1083
     */
1084 8
    protected function basic_get_ok($reader, $message)
1085
    {
1086 8
        $delivery_tag = $reader->read_longlong();
1087 8
        $redelivered = $reader->read_bit();
1088 8
        $exchange = $reader->read_shortstr();
1089 8
        $routing_key = $reader->read_shortstr();
1090 8
        $message_count = $reader->read_long();
1091
1092
        $message
1093 8
            ->setChannel($this)
1094 8
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
1095 8
            ->setMessageCount($message_count);
1096
1097 8
        return $message;
1098
    }
1099
1100
    /**
1101
     * @param string $exchange
1102
     * @param string $routing_key
1103
     * @param bool $mandatory
1104
     * @param bool $immediate
1105
     * @param int $ticket
1106
     * @return mixed
1107
     */
1108 14
    private function prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1109
    {
1110 14
        $cache_key = sprintf(
1111 14
            '%s|%s|%s|%s|%s',
1112
            $exchange,
1113
            $routing_key,
1114
            $mandatory,
1115
            $immediate,
1116
            $ticket
1117
        );
1118 14
        if (false === isset($this->publish_cache[$cache_key])) {
1119 14
            $ticket = $this->getTicket($ticket);
1120 14
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1121 14
                $ticket,
1122
                $exchange,
1123
                $routing_key,
1124
                $mandatory,
1125
                $immediate
1126
            );
1127
1128 14
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1129 14
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1130 14
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1131
                reset($this->publish_cache);
1132
                $old_key = key($this->publish_cache);
1133
                unset($this->publish_cache[$old_key]);
1134
            }
1135
        }
1136
1137 14
        return $this->publish_cache[$cache_key];
1138
    }
1139
1140
    /**
1141
     * Publishes a message
1142
     *
1143
     * @param AMQPMessage $msg
1144
     * @param string $exchange
1145
     * @param string $routing_key
1146
     * @param bool $mandatory
1147
     * @param bool $immediate
1148
     * @param int|null $ticket
1149
     * @throws AMQPChannelClosedException
1150
     * @throws AMQPConnectionClosedException
1151
     * @throws AMQPConnectionBlockedException
1152
     */
1153 15
    public function basic_publish(
1154
        $msg,
1155
        $exchange = '',
1156
        $routing_key = '',
1157
        $mandatory = false,
1158
        $immediate = false,
1159
        $ticket = null
1160
    ) {
1161 15
        $this->checkConnection();
1162 14
        $pkt = new AMQPWriter();
1163 14
        $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1164
1165
        try {
1166 14
            $this->connection->send_content(
1167 14
                $this->channel_id,
1168 14
                60,
1169 14
                0,
1170 14
                mb_strlen($msg->body, 'ASCII'),
1171 14
                $msg->serialize_properties(),
1172 14
                $msg->body,
1173
                $pkt
1174
            );
1175
        } catch (AMQPConnectionClosedException $e) {
1176
            $this->do_close();
1177
            throw $e;
1178
        }
1179
1180 14
        if ($this->next_delivery_tag > 0) {
1181 3
            $this->published_messages[$this->next_delivery_tag] = $msg;
1182 3
            $msg->setDeliveryInfo($this->next_delivery_tag, false, $exchange, $routing_key);
1183 3
            $this->next_delivery_tag++;
1184
        }
1185 14
    }
1186
1187
    /**
1188
     * @param AMQPMessage $message
1189
     * @param string $exchange
1190
     * @param string $routing_key
1191
     * @param bool $mandatory
1192
     * @param bool $immediate
1193
     * @param int|null $ticket
1194
     */
1195
    public function batch_basic_publish(
1196
        $message,
1197
        $exchange = '',
1198
        $routing_key = '',
1199
        $mandatory = false,
1200
        $immediate = false,
1201
        $ticket = null
1202
    ) {
1203
        $this->batch_messages[] = [
1204
            $message,
1205
            $exchange,
1206
            $routing_key,
1207
            $mandatory,
1208
            $immediate,
1209
            $ticket
1210
        ];
1211
    }
1212
1213
    /**
1214
     * Publish batch
1215
     *
1216
     * @return void
1217
     * @throws AMQPChannelClosedException
1218
     * @throws AMQPConnectionClosedException
1219
     * @throws AMQPConnectionBlockedException
1220
     */
1221
    public function publish_batch()
1222
    {
1223
        if (empty($this->batch_messages)) {
1224
            return;
1225
        }
1226
1227
        /** @var AMQPWriter $pkt */
1228
        $pkt = new AMQPWriter();
1229
1230
        foreach ($this->batch_messages as $m) {
1231
            /** @var AMQPMessage $msg */
1232
            $msg = $m[0];
1233
1234
            $exchange = isset($m[1]) ? $m[1] : '';
1235
            $routing_key = isset($m[2]) ? $m[2] : '';
1236
            $mandatory = isset($m[3]) ? $m[3] : false;
1237
            $immediate = isset($m[4]) ? $m[4] : false;
1238
            $ticket = isset($m[5]) ? $m[5] : null;
1239
            $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1240
1241
            $this->connection->prepare_content(
1242
                $this->channel_id,
1243
                60,
1244
                0,
1245
                mb_strlen($msg->body, 'ASCII'),
1246
                $msg->serialize_properties(),
1247
                $msg->body,
1248
                $pkt
1249
            );
1250
1251
            if ($this->next_delivery_tag > 0) {
1252
                $this->published_messages[$this->next_delivery_tag] = $msg;
1253
                $this->next_delivery_tag++;
1254
            }
1255
        }
1256
1257
        $this->checkConnection();
1258
        $this->connection->write($pkt->getvalue());
1259
        $this->batch_messages = array();
1260
    }
1261
1262
    /**
1263
     * Specifies QoS
1264
     *
1265
     * @param int $prefetch_size
1266
     * @param int $prefetch_count
1267
     * @param bool $a_global
1268
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1269
     * @return mixed
1270
     */
1271 View Code Duplication
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1272
    {
1273
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1274
            $prefetch_size,
1275
            $prefetch_count,
1276
            $a_global
1277
        );
1278
1279
        $this->send_method_frame(array($class_id, $method_id), $args);
1280
1281
        return $this->wait(array(
1282
            $this->waitHelper->get_wait('basic.qos_ok')
1283
        ), false, $this->channel_rpc_timeout);
1284
    }
1285
1286
    /**
1287
     * Confirms QoS request
1288
     */
1289
    protected function basic_qos_ok()
1290
    {
1291
    }
1292
1293
    /**
1294
     * Redelivers unacknowledged messages
1295
     *
1296
     * @param bool $requeue
1297
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1298
     * @return mixed
1299
     */
1300 View Code Duplication
    public function basic_recover($requeue = false)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1301
    {
1302
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1303
        $this->send_method_frame(array($class_id, $method_id), $args);
1304
1305
        return $this->wait(array(
1306
            $this->waitHelper->get_wait('basic.recover_ok')
1307
        ), false, $this->channel_rpc_timeout);
1308
    }
1309
1310
    /**
1311
     * Confirm the requested recover
1312
     */
1313
    protected function basic_recover_ok()
1314
    {
1315
    }
1316
1317
    /**
1318
     * Rejects an incoming message
1319
     *
1320
     * @param int $delivery_tag
1321
     * @param bool $requeue
1322
     */
1323
    public function basic_reject($delivery_tag, $requeue)
1324
    {
1325
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1326
        $this->send_method_frame(array($class_id, $method_id), $args);
1327
    }
1328
1329
    /**
1330
     * Returns a failed message
1331
     *
1332
     * @param AMQPReader $reader
1333
     * @param AMQPMessage $message
1334
     */
1335
    protected function basic_return($reader, $message)
1336
    {
1337
        $callback = $this->basic_return_callback;
1338
        if (!is_callable($callback)) {
1339
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1340
            return null;
1341
        }
1342
1343
        $reply_code = $reader->read_short();
1344
        $reply_text = $reader->read_shortstr();
1345
        $exchange = $reader->read_shortstr();
1346
        $routing_key = $reader->read_shortstr();
1347
1348
        call_user_func_array($callback, array(
1349
            $reply_code,
1350
            $reply_text,
1351
            $exchange,
1352
            $routing_key,
1353
            $message,
1354
        ));
1355
    }
1356
1357
    /**
1358
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1359
     * @return mixed
1360
     */
1361 View Code Duplication
    public function tx_commit()
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1362
    {
1363
        $this->send_method_frame(array(90, 20));
1364
1365
        return $this->wait(array(
1366
            $this->waitHelper->get_wait('tx.commit_ok')
1367
        ), false, $this->channel_rpc_timeout);
1368
    }
1369
1370
    /**
1371
     * Confirms a successful commit
1372
     */
1373
    protected function tx_commit_ok()
1374
    {
1375
    }
1376
1377
    /**
1378
     * Rollbacks the current transaction
1379
     *
1380
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1381
     * @return mixed
1382
     */
1383 View Code Duplication
    public function tx_rollback()
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1384
    {
1385
        $this->send_method_frame(array(90, 30));
1386
1387
        return $this->wait(array(
1388
            $this->waitHelper->get_wait('tx.rollback_ok')
1389
        ), false, $this->channel_rpc_timeout);
1390
    }
1391
1392
    /**
1393
     * Confirms a successful rollback
1394
     */
1395
    protected function tx_rollback_ok()
1396
    {
1397
    }
1398
1399
    /**
1400
     * Puts the channel into confirm mode
1401
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1402
     *
1403
     * @param bool $nowait
1404
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1405
     */
1406 4
    public function confirm_select($nowait = false)
1407
    {
1408 4
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
1409
1410 4
        $this->send_method_frame(array($class_id, $method_id), $args);
1411
1412 4
        if ($nowait) {
1413
            return null;
1414
        }
1415
1416 4
        $this->wait(array(
1417 4
            $this->waitHelper->get_wait('confirm.select_ok')
1418 4
        ), false, $this->channel_rpc_timeout);
1419 3
        $this->next_delivery_tag = 1;
1420 3
    }
1421
1422
    /**
1423
     * Confirms a selection
1424
     */
1425 3
    public function confirm_select_ok()
1426
    {
1427 3
    }
1428
1429
    /**
1430
     * Waits for pending acks and nacks from the server.
1431
     * If there are no pending acks, the method returns immediately
1432
     *
1433
     * @param int|float $timeout Waits until $timeout value is reached
1434
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1435
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1436
     */
1437 2 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1438
    {
1439
        $functions = array(
1440 2
            $this->waitHelper->get_wait('basic.ack'),
1441 2
            $this->waitHelper->get_wait('basic.nack'),
1442
        );
1443 2
        $timeout = max(0, $timeout);
1444 2
        while (!empty($this->published_messages)) {
1445 2
            $this->wait($functions, false, $timeout);
1446
        }
1447 2
    }
1448
1449
    /**
1450
     * Waits for pending acks, nacks and returns from the server.
1451
     * If there are no pending acks, the method returns immediately.
1452
     *
1453
     * @param int|float $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1454
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1455
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1456
     */
1457 1 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1458
    {
1459
        $functions = array(
1460 1
            $this->waitHelper->get_wait('basic.ack'),
1461 1
            $this->waitHelper->get_wait('basic.nack'),
1462 1
            $this->waitHelper->get_wait('basic.return'),
1463
        );
1464
1465 1
        $timeout = max(0, $timeout);
1466 1
        while (!empty($this->published_messages)) {
1467 1
            $this->wait($functions, false, $timeout);
1468
        }
1469 1
    }
1470
1471
    /**
1472
     * Selects standard transaction mode
1473
     *
1474
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1475
     * @return mixed
1476
     */
1477 View Code Duplication
    public function tx_select()
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1478
    {
1479
        $this->send_method_frame(array(90, 10));
1480
1481
        return $this->wait(array(
1482
            $this->waitHelper->get_wait('tx.select_ok')
1483
        ), false, $this->channel_rpc_timeout);
1484
    }
1485
1486
    /**
1487
     * Confirms transaction mode
1488
     */
1489
    protected function tx_select_ok()
1490
    {
1491
    }
1492
1493
    /**
1494
     * @param int|null $ticket
1495
     * @return int
1496
     */
1497 34
    protected function getTicket($ticket)
1498
    {
1499 34
        return (null === $ticket) ? $this->default_ticket : $ticket;
1500
    }
1501
1502
    /**
1503
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1504
     *
1505
     * @param int $index
1506
     * @return AMQPMessage
1507
     */
1508 3
    protected function get_and_unset_message($index)
1509
    {
1510 3
        $message = $this->published_messages[$index];
1511 3
        unset($this->published_messages[$index]);
1512
1513 3
        return $message;
1514
    }
1515
1516
    /**
1517
     * Sets callback for basic_return
1518
     *
1519
     * @param  callable $callback
1520
     * @throws \InvalidArgumentException if $callback is not callable
1521
     */
1522
    public function set_return_listener($callback)
1523
    {
1524
        Assert::isCallable($callback);
1525
        $this->basic_return_callback = $callback;
1526
    }
1527
1528
    /**
1529
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1530
     *
1531
     * @param callable $callback
1532
     * @throws \InvalidArgumentException
1533
     */
1534
    public function set_nack_handler($callback)
1535
    {
1536
        Assert::isCallable($callback);
1537
        $this->nack_handler = $callback;
1538
    }
1539
1540
    /**
1541
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1542
     *
1543
     * @param callable $callback
1544
     * @throws \InvalidArgumentException
1545
     */
1546 3
    public function set_ack_handler($callback)
1547
    {
1548 3
        Assert::isCallable($callback);
1549 3
        $this->ack_handler = $callback;
1550 3
    }
1551
1552
    /**
1553
     * @throws AMQPChannelClosedException
1554
     * @throws AMQPConnectionClosedException
1555
     * @throws AMQPConnectionBlockedException
1556
     */
1557 15
    private function checkConnection()
1558
    {
1559 15
        if ($this->connection === null || !$this->connection->isConnected()) {
1560
            throw new AMQPChannelClosedException('Channel connection is closed.');
1561
        }
1562 15
        if ($this->connection->isBlocked()) {
1563 1
            throw new AMQPConnectionBlockedException();
1564
        }
1565 14
    }
1566
}
1567