Completed
Push — master ( dafd10...eb0376 )
by Ramūnas
12:40 queued 12:38
created

AMQPChannel::prePublish()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3.0884

Importance

Changes 0
Metric Value
dl 0
loc 31
ccs 11
cts 14
cp 0.7856
rs 9.424
c 0
b 0
f 0
cc 3
nc 3
nop 5
crap 3.0884
1
<?php
2
3
namespace PhpAmqpLib\Channel;
4
5
use PhpAmqpLib\Connection\AbstractConnection;
6
use PhpAmqpLib\Exception\AMQPBasicCancelException;
7
use PhpAmqpLib\Exception\AMQPChannelClosedException;
8
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
9
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
10
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
11
use PhpAmqpLib\Exception\AMQPRuntimeException;
12
use PhpAmqpLib\Helper\Assert;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use PhpAmqpLib\Wire;
15
use PhpAmqpLib\Wire\AMQPReader;
16
use PhpAmqpLib\Wire\AMQPWriter;
17
18
class AMQPChannel extends AbstractChannel
19
{
20
    /**
21
     * @var callable[]
22
     * @internal Use is_consuming() to check if there is active callbacks
23
     */
24
    public $callbacks = array();
25
26
    /** @var bool Whether or not the channel has been "opened" */
27
    protected $is_open = false;
28
29
    /** @var int */
30
    protected $default_ticket = 0;
31
32
    /** @var bool */
33
    protected $active = true;
34
35
    /** @var array */
36
    protected $alerts = array();
37
38
    /** @var bool */
39
    protected $auto_decode;
40
41
    /**
42
     * These parameters will be passed to function in case of basic_return:
43
     *    param int $reply_code
44
     *    param string $reply_text
45
     *    param string $exchange
46
     *    param string $routing_key
47
     *    param AMQPMessage $msg
48
     *
49
     * @var null|callable
50
     */
51
    protected $basic_return_callback;
52
53
    /** @var array Used to keep track of the messages that are going to be batch published. */
54
    protected $batch_messages = array();
55
56
    /**
57
     * If the channel is in confirm_publish mode this array will store all published messages
58
     * until they get ack'ed or nack'ed
59
     *
60
     * @var AMQPMessage[]
61
     */
62
    private $published_messages = array();
63
64
    /** @var int */
65
    private $next_delivery_tag = 0;
66
67
    /** @var null|callable */
68
    private $ack_handler;
69
70
    /** @var null|callable */
71
    private $nack_handler;
72
73
    /**
74
     * Circular buffer to speed up both basic_publish() and publish_batch().
75
     * Max size limited by $publish_cache_max_size.
76
     *
77
     * @var array
78
     * @see basic_publish()
79
     * @see publish_batch()
80
     */
81
    private $publish_cache = array();
82
83
    /**
84
     * Maximal size of $publish_cache
85
     *
86
     * @var int
87
     */
88
    private $publish_cache_max_size = 100;
89
90
    /**
91
     * Maximum time to wait for operations on this channel, in seconds.
92
     * @var float
93
     */
94
    protected $channel_rpc_timeout;
95
96
    /**
97
     * @param AbstractConnection $connection
98
     * @param int|null $channel_id
99
     * @param bool $auto_decode
100
     * @param int|float $channel_rpc_timeout
101
     * @throws \Exception
102
     */
103 36
    public function __construct($connection, $channel_id = null, $auto_decode = true, $channel_rpc_timeout = 0)
104
    {
105 36
        if ($channel_id == null) {
0 ignored issues
show
Bug Best Practice introduced by
It seems like you are loosely comparing $channel_id of type integer|null against null; this is ambiguous if the integer can be zero. Consider using a strict comparison === instead.
Loading history...
106
            $channel_id = $connection->get_free_channel_id();
107
        }
108
109 36
        parent::__construct($connection, $channel_id);
110
111 36
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
112
113 36
        $this->auto_decode = $auto_decode;
114 36
        $this->channel_rpc_timeout = $channel_rpc_timeout;
0 ignored issues
show
Documentation Bug introduced by
The property $channel_rpc_timeout was declared of type double, but $channel_rpc_timeout is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
115
116
        try {
117 36
            $this->x_open();
118
        } catch (\Exception $e) {
119
            $this->close();
120
            throw $e;
121
        }
122 36
    }
123
124
    /**
125
     * @return bool
126
     */
127 4
    public function is_open()
128
    {
129 4
        return $this->is_open;
130
    }
131
132
    /**
133
     * Tear down this object, after we've agreed to close with the server.
134
     */
135 31
    protected function do_close()
136
    {
137 31
        if ($this->channel_id !== null) {
138 31
            unset($this->connection->channels[$this->channel_id]);
139
        }
140 31
        $this->channel_id = $this->connection = null;
141 31
        $this->is_open = false;
142 31
        $this->callbacks = array();
143 31
    }
144
145
    /**
146
     * Only for AMQP0.8.0
147
     * This method allows the server to send a non-fatal warning to
148
     * the client.  This is used for methods that are normally
149
     * asynchronous and thus do not have confirmations, and for which
150
     * the server may detect errors that need to be reported.  Fatal
151
     * errors are handled as channel or connection exceptions; non-
152
     * fatal errors are sent through this method.
153
     *
154
     * @param AMQPReader $reader
155
     */
156
    protected function channel_alert($reader)
157
    {
158
        $reply_code = $reader->read_short();
159
        $reply_text = $reader->read_shortstr();
160
        $details = $reader->read_table();
161
        array_push($this->alerts, array($reply_code, $reply_text, $details));
162
    }
163
164
    /**
165
     * Request a channel close
166
     *
167
     * @param int $reply_code
168
     * @param string $reply_text
169
     * @param array $method_sig
170
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
171
     * @return mixed
172
     */
173 31
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
174
    {
175 31
        $this->callbacks = array();
176 31
        if ($this->is_open === false || $this->connection === null) {
177 6
            $this->do_close();
178
179 6
            return null; // already closed
180
        }
181 31
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
182 31
            $reply_code,
183
            $reply_text,
184 31
            $method_sig[0],
185 31
            $method_sig[1]
186
        );
187
188
        try {
189 31
            $this->send_method_frame(array($class_id, $method_id), $args);
190
        } catch (\Exception $e) {
191
            $this->do_close();
192
193
            throw $e;
194
        }
195
196 31
        return $this->wait(array(
197 31
            $this->waitHelper->get_wait('channel.close_ok')
198 31
        ), false, $this->channel_rpc_timeout);
199
    }
200
201
    /**
202
     * @param AMQPReader $reader
203
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
204
     */
205 1
    protected function channel_close($reader)
206
    {
207 1
        $reply_code = $reader->read_short();
208 1
        $reply_text = $reader->read_shortstr();
209 1
        $class_id = $reader->read_short();
210 1
        $method_id = $reader->read_short();
211
212 1
        $this->send_method_frame(array(20, 41));
213 1
        $this->do_close();
214
215 1
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
216
    }
217
218
    /**
219
     * Confirm a channel close
220
     * Alias of AMQPChannel::do_close()
221
     */
222 31
    protected function channel_close_ok()
223
    {
224 31
        $this->do_close();
225 31
    }
226
227
    /**
228
     * Enables/disables flow from peer
229
     *
230
     * @param bool $active
231
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
232
     * @return mixed
233
     */
234
    public function flow($active)
235
    {
236
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
237
        $this->send_method_frame(array($class_id, $method_id), $args);
238
239
        return $this->wait(array(
240
            $this->waitHelper->get_wait('channel.flow_ok')
241
        ), false, $this->channel_rpc_timeout);
242
    }
243
244
    /**
245
     * @param AMQPReader $reader
246
     */
247
    protected function channel_flow($reader)
248
    {
249
        $this->active = $reader->read_bit();
250
        $this->x_flow_ok($this->active);
251
    }
252
253
    /**
254
     * @param bool $active
255
     */
256
    protected function x_flow_ok($active)
257
    {
258
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
259
        $this->send_method_frame(array($class_id, $method_id), $args);
260
    }
261
262
    /**
263
     * @param AMQPReader $reader
264
     * @return bool
265
     */
266
    protected function channel_flow_ok($reader)
267
    {
268
        return $reader->read_bit();
269
    }
270
271
    /**
272
     * @param string $out_of_band
273
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
274
     * @return mixed
275
     */
276 36
    protected function x_open($out_of_band = '')
277
    {
278 36
        if ($this->is_open) {
279
            return null;
280
        }
281
282 36
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
283 36
        $this->send_method_frame(array($class_id, $method_id), $args);
284
285 36
        return $this->wait(array(
286 36
            $this->waitHelper->get_wait('channel.open_ok')
287 36
        ), false, $this->channel_rpc_timeout);
288
    }
289
290 36
    protected function channel_open_ok()
291
    {
292 36
        $this->is_open = true;
293
294 36
        $this->debug->debug_msg('Channel open');
295 36
    }
296
297
    /**
298
     * Requests an access ticket
299
     *
300
     * @param string $realm
301
     * @param bool $exclusive
302
     * @param bool $passive
303
     * @param bool $active
304
     * @param bool $write
305
     * @param bool $read
306
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
307
     * @return mixed
308
     */
309
    public function access_request(
310
        $realm,
311
        $exclusive = false,
312
        $passive = false,
313
        $active = false,
314
        $write = false,
315
        $read = false
316
    ) {
317
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
318
            $realm,
319
            $exclusive,
320
            $passive,
321
            $active,
322
            $write,
323
            $read
324
        );
325
326
        $this->send_method_frame(array($class_id, $method_id), $args);
327
328
        return $this->wait(array(
329
            $this->waitHelper->get_wait('access.request_ok')
330
        ), false, $this->channel_rpc_timeout);
331
    }
332
333
    /**
334
     * Grants access to server resources
335
     *
336
     * @param AMQPReader $reader
337
     * @return string
338
     */
339
    protected function access_request_ok($reader)
340
    {
341
        $this->default_ticket = $reader->read_short();
342
343
        return $this->default_ticket;
344
    }
345
346
    /**
347
     * Declares exchange
348
     *
349
     * @param string $exchange
350
     * @param string $type
351
     * @param bool $passive
352
     * @param bool $durable
353
     * @param bool $auto_delete
354
     * @param bool $internal
355
     * @param bool $nowait
356
     * @param array $arguments
357
     * @param int|null $ticket
358
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
359
     * @return mixed|null
360
     */
361 29 View Code Duplication
    public function exchange_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
362
        $exchange,
363
        $type,
364
        $passive = false,
365
        $durable = false,
366
        $auto_delete = true,
367
        $internal = false,
368
        $nowait = false,
369
        $arguments = array(),
370
        $ticket = null
371
    ) {
372 29
        $ticket = $this->getTicket($ticket);
373
374 29
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
375 29
            $ticket,
376
            $exchange,
377
            $type,
378
            $passive,
379
            $durable,
380
            $auto_delete,
381
            $internal,
382
            $nowait,
383
            $arguments
384
        );
385
386 29
        $this->send_method_frame(array($class_id, $method_id), $args);
387
388 25
        if ($nowait) {
389
            return null;
390
        }
391
392 25
        return $this->wait(array(
393 25
            $this->waitHelper->get_wait('exchange.declare_ok')
394 25
        ), false, $this->channel_rpc_timeout);
395
    }
396
397
    /**
398
     * Confirms an exchange declaration
399
     */
400 24
    protected function exchange_declare_ok()
401
    {
402 24
    }
403
404
    /**
405
     * Deletes an exchange
406
     *
407
     * @param string $exchange
408
     * @param bool $if_unused
409
     * @param bool $nowait
410
     * @param int|null $ticket
411
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
412
     * @return mixed|null
413
     */
414 9 View Code Duplication
    public function exchange_delete(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
415
        $exchange,
416
        $if_unused = false,
417
        $nowait = false,
418
        $ticket = null
419
    ) {
420 9
        $ticket = $this->getTicket($ticket);
421 9
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
422 9
            $ticket,
423
            $exchange,
424
            $if_unused,
425
            $nowait
426
        );
427
428 9
        $this->send_method_frame(array($class_id, $method_id), $args);
429
430 9
        if ($nowait) {
431
            return null;
432
        }
433
434 9
        return $this->wait(array(
435 9
            $this->waitHelper->get_wait('exchange.delete_ok')
436 9
        ), false, $this->channel_rpc_timeout);
437
    }
438
439
    /**
440
     * Confirms deletion of an exchange
441
     */
442 9
    protected function exchange_delete_ok()
443
    {
444 9
    }
445
446
    /**
447
     * Binds dest exchange to source exchange
448
     *
449
     * @param string $destination
450
     * @param string $source
451
     * @param string $routing_key
452
     * @param bool $nowait
453
     * @param array $arguments
454
     * @param int|null $ticket
455
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
456
     * @return mixed|null
457
     */
458
    public function exchange_bind(
459
        $destination,
460
        $source,
461
        $routing_key = '',
462
        $nowait = false,
463
        $arguments = array(),
464
        $ticket = null
465
    ) {
466
        $ticket = $this->getTicket($ticket);
467
468
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
469
            $ticket,
470
            $destination,
471
            $source,
472
            $routing_key,
473
            $nowait,
474
            $arguments
475
        );
476
477
        $this->send_method_frame(array($class_id, $method_id), $args);
478
479
        if ($nowait) {
480
            return null;
481
        }
482
483
        return $this->wait(array(
484
            $this->waitHelper->get_wait('exchange.bind_ok')
485
        ), false, $this->channel_rpc_timeout);
486
    }
487
488
    /**
489
     * Confirms bind successful
490
     */
491
    protected function exchange_bind_ok()
492
    {
493
    }
494
495
    /**
496
     * Unbinds dest exchange from source exchange
497
     *
498
     * @param string $destination
499
     * @param string $source
500
     * @param string $routing_key
501
     * @param bool $nowait
502
     * @param array $arguments
503
     * @param int|null $ticket
504
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
505
     * @return mixed
506
     */
507 View Code Duplication
    public function exchange_unbind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
508
        $destination,
509
        $source,
510
        $routing_key = '',
511
        $nowait = false,
512
        $arguments = array(),
513
        $ticket = null
514
    ) {
515
        $ticket = $this->getTicket($ticket);
516
517
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
518
            $ticket,
519
            $destination,
520
            $source,
521
            $routing_key,
522
            $nowait,
523
            $arguments
524
        );
525
526
        $this->send_method_frame(array($class_id, $method_id), $args);
527
528
        return $this->wait(array(
529
            $this->waitHelper->get_wait('exchange.unbind_ok')
530
        ), false, $this->channel_rpc_timeout);
531
    }
532
533
    /**
534
     * Confirms unbind successful
535
     */
536
    protected function exchange_unbind_ok()
537
    {
538
    }
539
540
    /**
541
     * Binds queue to an exchange
542
     *
543
     * @param string $queue
544
     * @param string $exchange
545
     * @param string $routing_key
546
     * @param bool $nowait
547
     * @param array $arguments
548
     * @param int|null $ticket
549
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
550
     * @return mixed|null
551
     */
552 25
    public function queue_bind(
553
        $queue,
554
        $exchange,
555
        $routing_key = '',
556
        $nowait = false,
557
        $arguments = array(),
558
        $ticket = null
559
    ) {
560 25
        $ticket = $this->getTicket($ticket);
561
562 25
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
563 25
            $ticket,
564
            $queue,
565
            $exchange,
566
            $routing_key,
567
            $nowait,
568
            $arguments
569
        );
570
571 25
        $this->send_method_frame(array($class_id, $method_id), $args);
572
573 25
        if ($nowait) {
574
            return null;
575
        }
576
577 25
        return $this->wait(array(
578 25
            $this->waitHelper->get_wait('queue.bind_ok')
579 25
        ), false, $this->channel_rpc_timeout);
580
    }
581
582
    /**
583
     * Confirms bind successful
584
     */
585 24
    protected function queue_bind_ok()
586
    {
587 24
    }
588
589
    /**
590
     * Unbind queue from an exchange
591
     *
592
     * @param string $queue
593
     * @param string $exchange
594
     * @param string $routing_key
595
     * @param array $arguments
596
     * @param int|null $ticket
597
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
598
     * @return mixed
599
     */
600
    public function queue_unbind(
601
        $queue,
602
        $exchange,
603
        $routing_key = '',
604
        $arguments = array(),
605
        $ticket = null
606
    ) {
607
        $ticket = $this->getTicket($ticket);
608
609
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
610
            $ticket,
611
            $queue,
612
            $exchange,
613
            $routing_key,
614
            $arguments
615
        );
616
617
        $this->send_method_frame(array($class_id, $method_id), $args);
618
619
        return $this->wait(array(
620
            $this->waitHelper->get_wait('queue.unbind_ok')
621
        ), false, $this->channel_rpc_timeout);
622
    }
623
624
    /**
625
     * Confirms unbind successful
626
     */
627
    protected function queue_unbind_ok()
628
    {
629
    }
630
631
    /**
632
     * Declares queue, creates if needed
633
     *
634
     * @param string $queue
635
     * @param bool $passive
636
     * @param bool $durable
637
     * @param bool $exclusive
638
     * @param bool $auto_delete
639
     * @param bool $nowait
640
     * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments
641
     * @param int|null $ticket
642
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
643
     * @return array|null
644
     */
645 29 View Code Duplication
    public function queue_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
646
        $queue = '',
647
        $passive = false,
648
        $durable = false,
649
        $exclusive = false,
650
        $auto_delete = true,
651
        $nowait = false,
652
        $arguments = array(),
653
        $ticket = null
654
    ) {
655 29
        $ticket = $this->getTicket($ticket);
656
657 29
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
658 29
            $ticket,
659
            $queue,
660
            $passive,
661
            $durable,
662
            $exclusive,
663
            $auto_delete,
664
            $nowait,
665
            $arguments
666
        );
667
668 29
        $this->send_method_frame(array($class_id, $method_id), $args);
669
670 29
        if ($nowait) {
671
            return null;
672
        }
673
674 29
        return $this->wait(array(
675 29
            $this->waitHelper->get_wait('queue.declare_ok')
676 29
        ), false, $this->channel_rpc_timeout);
677
    }
678
679
    /**
680
     * Confirms a queue definition
681
     *
682
     * @param AMQPReader $reader
683
     * @return string[]
684
     */
685 28
    protected function queue_declare_ok($reader)
686
    {
687 28
        $queue = $reader->read_shortstr();
688 28
        $message_count = $reader->read_long();
689 28
        $consumer_count = $reader->read_long();
690
691 28
        return array($queue, $message_count, $consumer_count);
692
    }
693
694
    /**
695
     * Deletes a queue
696
     *
697
     * @param string $queue
698
     * @param bool $if_unused
699
     * @param bool $if_empty
700
     * @param bool $nowait
701
     * @param int|null $ticket
702
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
703
     * @return mixed|null
704
     */
705 7
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
706
    {
707 7
        $ticket = $this->getTicket($ticket);
708
709 7
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
710 7
            $ticket,
711
            $queue,
712
            $if_unused,
713
            $if_empty,
714
            $nowait
715
        );
716
717 7
        $this->send_method_frame(array($class_id, $method_id), $args);
718
719 7
        if ($nowait) {
720 1
            return null;
721
        }
722
723 6
        return $this->wait(array(
724 6
            $this->waitHelper->get_wait('queue.delete_ok')
725 6
        ), false, $this->channel_rpc_timeout);
726
    }
727
728
    /**
729
     * Confirms deletion of a queue
730
     *
731
     * @param AMQPReader $reader
732
     * @return string
733
     */
734 6
    protected function queue_delete_ok($reader)
735
    {
736 6
        return $reader->read_long();
737
    }
738
739
    /**
740
     * Purges a queue
741
     *
742
     * @param string $queue
743
     * @param bool $nowait
744
     * @param int|null $ticket
745
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
746
     * @return mixed|null
747
     */
748 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
749
    {
750
        $ticket = $this->getTicket($ticket);
751
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
752
753
        $this->send_method_frame(array($class_id, $method_id), $args);
754
755
        if ($nowait) {
756
            return null;
757
        }
758
759
        return $this->wait(array(
760
            $this->waitHelper->get_wait('queue.purge_ok')
761
        ), false, $this->channel_rpc_timeout);
762
    }
763
764
    /**
765
     * Confirms a queue purge
766
     *
767
     * @param AMQPReader $reader
768
     * @return string
769
     */
770
    protected function queue_purge_ok($reader)
771
    {
772
        return $reader->read_long();
773
    }
774
775
    /**
776
     * Acknowledges one or more messages
777
     *
778
     * @param int $delivery_tag
779
     * @param bool $multiple
780
     */
781 2
    public function basic_ack($delivery_tag, $multiple = false)
782
    {
783 2
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
784 2
        $this->send_method_frame(array($class_id, $method_id), $args);
785 2
    }
786
787
    /**
788
     * Called when the server sends a basic.ack
789
     *
790
     * @param AMQPReader $reader
791
     * @throws AMQPRuntimeException
792
     */
793 3 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
794
    {
795 3
        $delivery_tag = $reader->read_longlong();
796 3
        $multiple = (bool) $reader->read_bit();
797
798 3
        if (!isset($this->published_messages[$delivery_tag])) {
799
            throw new AMQPRuntimeException(sprintf(
800
                'Server ack\'ed unknown delivery_tag "%s"',
801
                $delivery_tag
802
            ));
803
        }
804
805 3
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
0 ignored issues
show
Bug introduced by
It seems like $this->ack_handler can also be of type null; however, PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept callable, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
806 3
    }
807
808
    /**
809
     * Called when the server sends a basic.nack
810
     *
811
     * @param AMQPReader $reader
812
     * @throws AMQPRuntimeException
813
     */
814 View Code Duplication
    protected function basic_nack_from_server($reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
815
    {
816
        $delivery_tag = $reader->read_longlong();
817
        $multiple = (bool) $reader->read_bit();
818
819
        if (!isset($this->published_messages[$delivery_tag])) {
820
            throw new AMQPRuntimeException(sprintf(
821
                'Server nack\'ed unknown delivery_tag "%s"',
822
                $delivery_tag
823
            ));
824
        }
825
826
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
0 ignored issues
show
Bug introduced by
It seems like $this->nack_handler can also be of type null; however, PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept callable, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
827
    }
828
829
    /**
830
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
831
     *
832
     * @param int $delivery_tag
833
     * @param bool $multiple
834
     * @param callable $handler
835
     */
836 3
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
837
    {
838 3
        if ($multiple) {
839
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
840
841
            foreach ($keys as $key) {
842
                $this->internal_ack_handler($key, false, $handler);
843
            }
844
        } else {
845 3
            $message = $this->get_and_unset_message($delivery_tag);
846 3
            $this->dispatch_to_handler($handler, array($message));
847
        }
848 3
    }
849
850
    /**
851
     * @param AMQPMessage[] $messages
852
     * @param string $value
853
     * @return mixed
854
     */
855
    protected function get_keys_less_or_equal(array $messages, $value)
856
    {
857
        $value = (int) $value;
858
        $keys = array_reduce(
859
            array_keys($messages),
860
            /**
861
             * @param string $key
862
             */
863
            function ($keys, $key) use ($value) {
864
                if ($key <= $value) {
865
                    $keys[] = $key;
866
                }
867
868
                return $keys;
869
            },
870
            array()
871
        );
872
873
        return $keys;
874
    }
875
876
    /**
877
     * Rejects one or several received messages
878
     *
879
     * @param int $delivery_tag
880
     * @param bool $multiple
881
     * @param bool $requeue
882
     */
883
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
884
    {
885
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
886
        $this->send_method_frame(array($class_id, $method_id), $args);
887
    }
888
889
    /**
890
     * Ends a queue consumer
891
     *
892
     * @param string $consumer_tag
893
     * @param bool $nowait
894
     * @param bool $noreturn
895
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
896
     * @return mixed
897
     */
898 3 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
899
    {
900 3
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
901 3
        $this->send_method_frame(array($class_id, $method_id), $args);
902
903 3
        if ($nowait || $noreturn) {
904
            unset($this->callbacks[$consumer_tag]);
905
            return $consumer_tag;
906
        }
907
908 3
        return $this->wait(array(
909 3
            $this->waitHelper->get_wait('basic.cancel_ok')
910 3
        ), false, $this->channel_rpc_timeout);
911
    }
912
913
    /**
914
     * @param AMQPReader $reader
915
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
916
     */
917
    protected function basic_cancel_from_server(AMQPReader $reader)
918
    {
919
        throw new AMQPBasicCancelException($reader->read_shortstr());
920
    }
921
922
    /**
923
     * Confirm a cancelled consumer
924
     *
925
     * @param AMQPReader $reader
926
     * @return string
927
     */
928 3
    protected function basic_cancel_ok($reader)
929
    {
930 3
        $consumerTag = $reader->read_shortstr();
931 3
        unset($this->callbacks[$consumerTag]);
932
933 3
        return $consumerTag;
934
    }
935
936
    /**
937
     * @return bool
938
     */
939 1
    public function is_consuming()
940
    {
941 1
        return !empty($this->callbacks);
942
    }
943
944
    /**
945
     * Start a queue consumer.
946
     * This method asks the server to start a "consumer", which is a transient request for messages
947
     * from a specific queue.
948
     * Consumers last as long as the channel they were declared on, or until the client cancels them.
949
     *
950
     * @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
951
     *
952
     * @param string $queue
953
     * @param string $consumer_tag
954
     * @param bool $no_local
955
     * @param bool $no_ack
956
     * @param bool $exclusive
957
     * @param bool $nowait
958
     * @param callable|null $callback
959
     * @param int|null $ticket
960
     * @param array $arguments
961
     *
962
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
963
     * @throws \InvalidArgumentException
964
     * @return string
965
     */
966 7
    public function basic_consume(
967
        $queue = '',
968
        $consumer_tag = '',
969
        $no_local = false,
970
        $no_ack = false,
971
        $exclusive = false,
972
        $nowait = false,
973
        $callback = null,
974
        $ticket = null,
975
        $arguments = array()
976
    ) {
977 7
        if (null !== $callback) {
978 6
            Assert::isCallable($callback);
979
        }
980 6
        if ($nowait && empty($consumer_tag)) {
981 1
            throw new \InvalidArgumentException('Cannot start consumer without consumer_tag and no-wait=true');
982
        }
983 5
        if (!empty($consumer_tag) && array_key_exists($consumer_tag, $this->callbacks)) {
984 1
            throw new \InvalidArgumentException('This consumer tag is already registered.');
985
        }
986
987 5
        $ticket = $this->getTicket($ticket);
988 5
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
989 5
            $ticket,
990
            $queue,
991
            $consumer_tag,
992
            $no_local,
993
            $no_ack,
994
            $exclusive,
995
            $nowait,
996 5
            $this->protocolVersion === Wire\Constants091::VERSION ? $arguments : null
997
        );
998
999 5
        $this->send_method_frame(array($class_id, $method_id), $args);
1000
1001 5
        if (false === $nowait) {
1002 5
            $consumer_tag = $this->wait(array(
1003 5
                $this->waitHelper->get_wait('basic.consume_ok')
1004 5
            ), false, $this->channel_rpc_timeout);
1005
        }
1006
1007 5
        $this->callbacks[$consumer_tag] = $callback;
1008
1009 5
        return $consumer_tag;
1010
    }
1011
1012
    /**
1013
     * Confirms a new consumer
1014
     *
1015
     * @param AMQPReader $reader
1016
     * @return string
1017
     */
1018 5
    protected function basic_consume_ok($reader)
1019
    {
1020 5
        return $reader->read_shortstr();
1021
    }
1022
1023
    /**
1024
     * Notifies the client of a consumer message
1025
     *
1026
     * @param AMQPReader $reader
1027
     * @param AMQPMessage $message
1028
     */
1029 4
    protected function basic_deliver($reader, $message)
1030
    {
1031 4
        $consumer_tag = $reader->read_shortstr();
1032 4
        $delivery_tag = $reader->read_longlong();
1033 4
        $redelivered = $reader->read_bit();
1034 4
        $exchange = $reader->read_shortstr();
1035 4
        $routing_key = $reader->read_shortstr();
1036
1037
        $message
1038 4
            ->setChannel($this)
1039 4
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
1040 4
            ->setConsumerTag($consumer_tag);
1041
1042 4
        if (isset($this->callbacks[$consumer_tag])) {
1043 4
            call_user_func($this->callbacks[$consumer_tag], $message);
1044
        }
1045 3
    }
1046
1047
    /**
1048
     * Direct access to a queue if no message was available in the queue, return null
1049
     *
1050
     * @param string $queue
1051
     * @param bool $no_ack
1052
     * @param int|null $ticket
1053
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1054
     * @return AMQPMessage|null
1055
     */
1056 8
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
1057
    {
1058 8
        $ticket = $this->getTicket($ticket);
1059 8
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1060
1061 8
        $this->send_method_frame(array($class_id, $method_id), $args);
1062
1063 8
        return $this->wait(array(
1064 8
            $this->waitHelper->get_wait('basic.get_ok'),
1065 8
            $this->waitHelper->get_wait('basic.get_empty')
1066 8
        ), false, $this->channel_rpc_timeout);
1067
    }
1068
1069
    /**
1070
     * Indicates no messages available
1071
     */
1072
    protected function basic_get_empty()
1073
    {
1074
    }
1075
1076
    /**
1077
     * Provides client with a message
1078
     *
1079
     * @param AMQPReader $reader
1080
     * @param AMQPMessage $message
1081
     * @return AMQPMessage
1082
     */
1083 8
    protected function basic_get_ok($reader, $message)
1084
    {
1085 8
        $delivery_tag = $reader->read_longlong();
1086 8
        $redelivered = $reader->read_bit();
1087 8
        $exchange = $reader->read_shortstr();
1088 8
        $routing_key = $reader->read_shortstr();
1089 8
        $message_count = $reader->read_long();
1090
1091
        $message
1092 8
            ->setChannel($this)
1093 8
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
1094 8
            ->setMessageCount($message_count);
1095
1096 8
        return $message;
1097
    }
1098
1099
    /**
1100
     * @param string $exchange
1101
     * @param string $routing_key
1102
     * @param bool $mandatory
1103
     * @param bool $immediate
1104
     * @param int $ticket
1105
     * @return mixed
1106
     */
1107 14
    private function prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1108
    {
1109 14
        $cache_key = sprintf(
1110 14
            '%s|%s|%s|%s|%s',
1111
            $exchange,
1112
            $routing_key,
1113
            $mandatory,
1114
            $immediate,
1115
            $ticket
1116
        );
1117 14
        if (false === isset($this->publish_cache[$cache_key])) {
1118 14
            $ticket = $this->getTicket($ticket);
1119 14
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1120 14
                $ticket,
1121
                $exchange,
1122
                $routing_key,
1123
                $mandatory,
1124
                $immediate
1125
            );
1126
1127 14
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1128 14
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1129 14
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1130
                reset($this->publish_cache);
1131
                $old_key = key($this->publish_cache);
1132
                unset($this->publish_cache[$old_key]);
1133
            }
1134
        }
1135
1136 14
        return $this->publish_cache[$cache_key];
1137
    }
1138
1139
    /**
1140
     * Publishes a message
1141
     *
1142
     * @param AMQPMessage $msg
1143
     * @param string $exchange
1144
     * @param string $routing_key
1145
     * @param bool $mandatory
1146
     * @param bool $immediate
1147
     * @param int|null $ticket
1148
     * @throws AMQPChannelClosedException
1149
     * @throws AMQPConnectionClosedException
1150
     * @throws AMQPConnectionBlockedException
1151
     */
1152 15
    public function basic_publish(
1153
        $msg,
1154
        $exchange = '',
1155
        $routing_key = '',
1156
        $mandatory = false,
1157
        $immediate = false,
1158
        $ticket = null
1159
    ) {
1160 15
        $this->checkConnection();
1161 14
        $pkt = new AMQPWriter();
1162 14
        $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1163
1164
        try {
1165 14
            $this->connection->send_content(
1166 14
                $this->channel_id,
1167 14
                60,
1168 14
                0,
1169 14
                mb_strlen($msg->body, 'ASCII'),
1170 14
                $msg->serialize_properties(),
1171 14
                $msg->body,
1172
                $pkt
1173
            );
1174
        } catch (AMQPConnectionClosedException $e) {
1175
            $this->do_close();
1176
            throw $e;
1177
        }
1178
1179 14
        if ($this->next_delivery_tag > 0) {
1180 3
            $this->published_messages[$this->next_delivery_tag] = $msg;
1181 3
            $msg->setDeliveryInfo($this->next_delivery_tag, false, $exchange, $routing_key);
1182 3
            $this->next_delivery_tag++;
1183
        }
1184 14
    }
1185
1186
    /**
1187
     * @param AMQPMessage $message
1188
     * @param string $exchange
1189
     * @param string $routing_key
1190
     * @param bool $mandatory
1191
     * @param bool $immediate
1192
     * @param int|null $ticket
1193
     */
1194
    public function batch_basic_publish(
1195
        $message,
1196
        $exchange = '',
1197
        $routing_key = '',
1198
        $mandatory = false,
1199
        $immediate = false,
1200
        $ticket = null
1201
    ) {
1202
        $this->batch_messages[] = [
1203
            $message,
1204
            $exchange,
1205
            $routing_key,
1206
            $mandatory,
1207
            $immediate,
1208
            $ticket
1209
        ];
1210
    }
1211
1212
    /**
1213
     * Publish batch
1214
     *
1215
     * @return void
1216
     * @throws AMQPChannelClosedException
1217
     * @throws AMQPConnectionClosedException
1218
     * @throws AMQPConnectionBlockedException
1219
     */
1220
    public function publish_batch()
1221
    {
1222
        if (empty($this->batch_messages)) {
1223
            return;
1224
        }
1225
1226
        /** @var AMQPWriter $pkt */
1227
        $pkt = new AMQPWriter();
1228
1229
        foreach ($this->batch_messages as $m) {
1230
            /** @var AMQPMessage $msg */
1231
            $msg = $m[0];
1232
1233
            $exchange = isset($m[1]) ? $m[1] : '';
1234
            $routing_key = isset($m[2]) ? $m[2] : '';
1235
            $mandatory = isset($m[3]) ? $m[3] : false;
1236
            $immediate = isset($m[4]) ? $m[4] : false;
1237
            $ticket = isset($m[5]) ? $m[5] : null;
1238
            $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1239
1240
            $this->connection->prepare_content(
1241
                $this->channel_id,
1242
                60,
1243
                0,
1244
                mb_strlen($msg->body, 'ASCII'),
1245
                $msg->serialize_properties(),
1246
                $msg->body,
1247
                $pkt
1248
            );
1249
1250
            if ($this->next_delivery_tag > 0) {
1251
                $this->published_messages[$this->next_delivery_tag] = $msg;
1252
                $this->next_delivery_tag++;
1253
            }
1254
        }
1255
1256
        $this->checkConnection();
1257
        $this->connection->write($pkt->getvalue());
1258
        $this->batch_messages = array();
1259
    }
1260
1261
    /**
1262
     * Specifies QoS
1263
     *
1264
     * @param int $prefetch_size
1265
     * @param int $prefetch_count
1266
     * @param bool $a_global
1267
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1268
     * @return mixed
1269
     */
1270 View Code Duplication
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1271
    {
1272
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1273
            $prefetch_size,
1274
            $prefetch_count,
1275
            $a_global
1276
        );
1277
1278
        $this->send_method_frame(array($class_id, $method_id), $args);
1279
1280
        return $this->wait(array(
1281
            $this->waitHelper->get_wait('basic.qos_ok')
1282
        ), false, $this->channel_rpc_timeout);
1283
    }
1284
1285
    /**
1286
     * Confirms QoS request
1287
     */
1288
    protected function basic_qos_ok()
1289
    {
1290
    }
1291
1292
    /**
1293
     * Redelivers unacknowledged messages
1294
     *
1295
     * @param bool $requeue
1296
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1297
     * @return mixed
1298
     */
1299 View Code Duplication
    public function basic_recover($requeue = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1300
    {
1301
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1302
        $this->send_method_frame(array($class_id, $method_id), $args);
1303
1304
        return $this->wait(array(
1305
            $this->waitHelper->get_wait('basic.recover_ok')
1306
        ), false, $this->channel_rpc_timeout);
1307
    }
1308
1309
    /**
1310
     * Confirm the requested recover
1311
     */
1312
    protected function basic_recover_ok()
1313
    {
1314
    }
1315
1316
    /**
1317
     * Rejects an incoming message
1318
     *
1319
     * @param int $delivery_tag
1320
     * @param bool $requeue
1321
     */
1322
    public function basic_reject($delivery_tag, $requeue)
1323
    {
1324
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1325
        $this->send_method_frame(array($class_id, $method_id), $args);
1326
    }
1327
1328
    /**
1329
     * Returns a failed message
1330
     *
1331
     * @param AMQPReader $reader
1332
     * @param AMQPMessage $message
1333
     */
1334
    protected function basic_return($reader, $message)
1335
    {
1336
        $callback = $this->basic_return_callback;
1337
        if (!is_callable($callback)) {
1338
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1339
            return null;
1340
        }
1341
1342
        $reply_code = $reader->read_short();
1343
        $reply_text = $reader->read_shortstr();
1344
        $exchange = $reader->read_shortstr();
1345
        $routing_key = $reader->read_shortstr();
1346
1347
        call_user_func_array($callback, array(
1348
            $reply_code,
1349
            $reply_text,
1350
            $exchange,
1351
            $routing_key,
1352
            $message,
1353
        ));
1354
    }
1355
1356
    /**
1357
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1358
     * @return mixed
1359
     */
1360 View Code Duplication
    public function tx_commit()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1361
    {
1362
        $this->send_method_frame(array(90, 20));
1363
1364
        return $this->wait(array(
1365
            $this->waitHelper->get_wait('tx.commit_ok')
1366
        ), false, $this->channel_rpc_timeout);
1367
    }
1368
1369
    /**
1370
     * Confirms a successful commit
1371
     */
1372
    protected function tx_commit_ok()
1373
    {
1374
    }
1375
1376
    /**
1377
     * Rollbacks the current transaction
1378
     *
1379
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1380
     * @return mixed
1381
     */
1382 View Code Duplication
    public function tx_rollback()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1383
    {
1384
        $this->send_method_frame(array(90, 30));
1385
1386
        return $this->wait(array(
1387
            $this->waitHelper->get_wait('tx.rollback_ok')
1388
        ), false, $this->channel_rpc_timeout);
1389
    }
1390
1391
    /**
1392
     * Confirms a successful rollback
1393
     */
1394
    protected function tx_rollback_ok()
1395
    {
1396
    }
1397
1398
    /**
1399
     * Puts the channel into confirm mode
1400
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1401
     *
1402
     * @param bool $nowait
1403
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1404
     */
1405 4
    public function confirm_select($nowait = false)
1406
    {
1407 4
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
1408
1409 4
        $this->send_method_frame(array($class_id, $method_id), $args);
1410
1411 4
        if ($nowait) {
1412
            return null;
1413
        }
1414
1415 4
        $this->wait(array(
1416 4
            $this->waitHelper->get_wait('confirm.select_ok')
1417 4
        ), false, $this->channel_rpc_timeout);
1418 3
        $this->next_delivery_tag = 1;
1419 3
    }
1420
1421
    /**
1422
     * Confirms a selection
1423
     */
1424 3
    public function confirm_select_ok()
1425
    {
1426 3
    }
1427
1428
    /**
1429
     * Waits for pending acks and nacks from the server.
1430
     * If there are no pending acks, the method returns immediately
1431
     *
1432
     * @param int|float $timeout Waits until $timeout value is reached
1433
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1434
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1435
     */
1436 2 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1437
    {
1438
        $functions = array(
1439 2
            $this->waitHelper->get_wait('basic.ack'),
1440 2
            $this->waitHelper->get_wait('basic.nack'),
1441
        );
1442 2
        $timeout = max(0, $timeout);
1443 2
        while (!empty($this->published_messages)) {
1444 2
            $this->wait($functions, false, $timeout);
1445
        }
1446 2
    }
1447
1448
    /**
1449
     * Waits for pending acks, nacks and returns from the server.
1450
     * If there are no pending acks, the method returns immediately.
1451
     *
1452
     * @param int|float $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1453
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1454
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1455
     */
1456 1 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1457
    {
1458
        $functions = array(
1459 1
            $this->waitHelper->get_wait('basic.ack'),
1460 1
            $this->waitHelper->get_wait('basic.nack'),
1461 1
            $this->waitHelper->get_wait('basic.return'),
1462
        );
1463
1464 1
        $timeout = max(0, $timeout);
1465 1
        while (!empty($this->published_messages)) {
1466 1
            $this->wait($functions, false, $timeout);
1467
        }
1468 1
    }
1469
1470
    /**
1471
     * Selects standard transaction mode
1472
     *
1473
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1474
     * @return mixed
1475
     */
1476 View Code Duplication
    public function tx_select()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1477
    {
1478
        $this->send_method_frame(array(90, 10));
1479
1480
        return $this->wait(array(
1481
            $this->waitHelper->get_wait('tx.select_ok')
1482
        ), false, $this->channel_rpc_timeout);
1483
    }
1484
1485
    /**
1486
     * Confirms transaction mode
1487
     */
1488
    protected function tx_select_ok()
1489
    {
1490
    }
1491
1492
    /**
1493
     * @param int|null $ticket
1494
     * @return int
1495
     */
1496 32
    protected function getTicket($ticket)
1497
    {
1498 32
        return (null === $ticket) ? $this->default_ticket : $ticket;
1499
    }
1500
1501
    /**
1502
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1503
     *
1504
     * @param int $index
1505
     * @return AMQPMessage
1506
     */
1507 3
    protected function get_and_unset_message($index)
1508
    {
1509 3
        $message = $this->published_messages[$index];
1510 3
        unset($this->published_messages[$index]);
1511
1512 3
        return $message;
1513
    }
1514
1515
    /**
1516
     * Sets callback for basic_return
1517
     *
1518
     * @param  callable $callback
1519
     * @throws \InvalidArgumentException if $callback is not callable
1520
     */
1521
    public function set_return_listener($callback)
1522
    {
1523
        Assert::isCallable($callback);
1524
        $this->basic_return_callback = $callback;
1525
    }
1526
1527
    /**
1528
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1529
     *
1530
     * @param callable $callback
1531
     * @throws \InvalidArgumentException
1532
     */
1533
    public function set_nack_handler($callback)
1534
    {
1535
        Assert::isCallable($callback);
1536
        $this->nack_handler = $callback;
1537
    }
1538
1539
    /**
1540
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1541
     *
1542
     * @param callable $callback
1543
     * @throws \InvalidArgumentException
1544
     */
1545 3
    public function set_ack_handler($callback)
1546
    {
1547 3
        Assert::isCallable($callback);
1548 3
        $this->ack_handler = $callback;
1549 3
    }
1550
1551
    /**
1552
     * @throws AMQPChannelClosedException
1553
     * @throws AMQPConnectionClosedException
1554
     * @throws AMQPConnectionBlockedException
1555
     */
1556 15
    private function checkConnection()
1557
    {
1558 15
        if ($this->connection === null || !$this->connection->isConnected()) {
1559
            throw new AMQPChannelClosedException('Channel connection is closed.');
1560
        }
1561 15
        if ($this->connection->isBlocked()) {
1562 1
            throw new AMQPConnectionBlockedException();
1563
        }
1564 14
    }
1565
}
1566