Completed
Push — master ( ff1afe...c244f9 )
by
unknown
16:11 queued 14:40
created

AMQPChannel::get_and_unset_message()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 7
ccs 4
cts 4
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Exception\AMQPBasicCancelException;
5
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
6
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPWriter;
11
12
class AMQPChannel extends AbstractChannel
13
{
14
    /**
15
     * @var callable[]
16
     * @internal Use is_consuming() to check if there is active callbacks
17
     */
18
    public $callbacks = array();
19
20
    /** @var bool Whether or not the channel has been "opened" */
21
    protected $is_open = false;
22
23
    /** @var int */
24
    protected $default_ticket = 0;
25
26
    /** @var bool */
27
    protected $active = true;
28
29
    /** @var array */
30
    protected $alerts = array();
31
32
    /** @var bool */
33
    protected $auto_decode;
34
35
    /**
36
     * These parameters will be passed to function in case of basic_return:
37
     *    param int $reply_code
38
     *    param string $reply_text
39
     *    param string $exchange
40
     *    param string $routing_key
41
     *    param AMQPMessage $msg
42
     *
43
     * @var callable
44
     */
45
    protected $basic_return_callback;
46
47
    /** @var array Used to keep track of the messages that are going to be batch published. */
48
    protected $batch_messages = array();
49
50
    /**
51
     * If the channel is in confirm_publish mode this array will store all published messages
52
     * until they get ack'ed or nack'ed
53
     *
54
     * @var AMQPMessage[]
55
     */
56
    private $published_messages = array();
57
58
    /** @var int */
59
    private $next_delivery_tag = 0;
60
61
    /** @var callable */
62
    private $ack_handler;
63
64
    /** @var callable */
65
    private $nack_handler;
66
67
    /**
68
     * Circular buffer to speed up both basic_publish() and publish_batch().
69
     * Max size limited by $publish_cache_max_size.
70
     *
71
     * @var array
72
     * @see basic_publish()
73
     * @see publish_batch()
74
     */
75
    private $publish_cache = array();
76
77
    /**
78
     * Maximal size of $publish_cache
79
     *
80
     * @var int
81
     */
82
    private $publish_cache_max_size = 100;
83
84
    /**
85
     * Maximum time to wait for operations on this channel, in seconds.
86
     * @var float $channel_rpc_timeout
87
     */
88
    private $channel_rpc_timeout;
89
90
    /**
91
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
92
     * @param null $channel_id
93
     * @param bool $auto_decode
94
     * @param int $channel_rpc_timeout
95
     * @throws \Exception
96
     */
97 174
    public function __construct($connection, $channel_id = null, $auto_decode = true, $channel_rpc_timeout = 0)
98
    {
99 174
        if ($channel_id == null) {
100
            $channel_id = $connection->get_free_channel_id();
101
        }
102
103 174
        parent::__construct($connection, $channel_id);
104
105 174
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
106
107 174
        $this->auto_decode = $auto_decode;
108 174
        $this->channel_rpc_timeout = $channel_rpc_timeout;
0 ignored issues
show
Documentation Bug introduced by
The property $channel_rpc_timeout was declared of type double, but $channel_rpc_timeout is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
109
110
        try {
111 174
            $this->x_open();
112 87
        } catch (\Exception $e) {
113
            $this->close();
114
            throw $e;
115
        }
116 174
    }
117
118
    /**
119
     * @return bool
120
     */
121
    public function is_open()
122
    {
123
        return $this->is_open;
124
    }
125
126
    /**
127
     * Tear down this object, after we've agreed to close with the server.
128
     */
129 150
    protected function do_close()
130
    {
131 150
        if ($this->channel_id !== null) {
132 150
            unset($this->connection->channels[$this->channel_id]);
133 75
        }
134 150
        $this->channel_id = $this->connection = null;
135 150
        $this->is_open = false;
136 150
        $this->callbacks = array();
137 150
    }
138
139
    /**
140
     * Only for AMQP0.8.0
141
     * This method allows the server to send a non-fatal warning to
142
     * the client.  This is used for methods that are normally
143
     * asynchronous and thus do not have confirmations, and for which
144
     * the server may detect errors that need to be reported.  Fatal
145
     * errors are handled as channel or connection exceptions; non-
146
     * fatal errors are sent through this method.
147
     *
148
     * @param AMQPReader $reader
149
     */
150
    protected function channel_alert($reader)
151
    {
152
        $reply_code = $reader->read_short();
153
        $reply_text = $reader->read_shortstr();
154
        $details = $reader->read_table();
155
        array_push($this->alerts, array($reply_code, $reply_text, $details));
156
    }
157
158
    /**
159
     * Request a channel close
160
     *
161
     * @param int $reply_code
162
     * @param string $reply_text
163
     * @param array $method_sig
164
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
165
     * @return mixed
166
     */
167 150
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
168
    {
169 150
        $this->callbacks = array();
170 150
        if ($this->is_open === false || $this->connection === null) {
171 36
            $this->do_close();
172
173 36
            return null; // already closed
174
        }
175 150
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
176 150
            $reply_code,
177 125
            $reply_text,
178 150
            $method_sig[0],
179 150
            $method_sig[1]
180 75
        );
181
182
        try {
183 150
            $this->send_method_frame(array($class_id, $method_id), $args);
184 75
        } catch (\Exception $e) {
185
            $this->do_close();
186
187
            throw $e;
188
        }
189
190 150
        return $this->wait(array(
191 150
            $this->waitHelper->get_wait('channel.close_ok')
192 150
        ), false, $this->channel_rpc_timeout );
193
    }
194
195
    /**
196
     * @param AMQPReader $reader
197
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
198
     */
199 6
    protected function channel_close($reader)
200
    {
201 6
        $reply_code = $reader->read_short();
202 6
        $reply_text = $reader->read_shortstr();
203 6
        $class_id = $reader->read_short();
204 6
        $method_id = $reader->read_short();
205
206 6
        $this->send_method_frame(array(20, 41));
207 6
        $this->do_close();
208
209 6
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
210
    }
211
212
    /**
213
     * Confirm a channel close
214
     * Alias of AMQPChannel::do_close()
215
     *
216
     * @param AMQPReader $reader
217
     */
218 150
    protected function channel_close_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
219
    {
220 150
        $this->do_close();
221 150
    }
222
223
    /**
224
     * Enables/disables flow from peer
225
     *
226
     * @param $active
227
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
228
     * @return mixed
229
     */
230
    public function flow($active)
231
    {
232
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
233
        $this->send_method_frame(array($class_id, $method_id), $args);
234
235
        return $this->wait(array(
236
            $this->waitHelper->get_wait('channel.flow_ok')
237
        ), false, $this->channel_rpc_timeout);
238
    }
239
240
    /**
241
     * @param AMQPReader $reader
242
     */
243
    protected function channel_flow($reader)
244
    {
245
        $this->active = $reader->read_bit();
246
        $this->x_flow_ok($this->active);
247
    }
248
249
    /**
250
     * @param bool $active
251
     */
252
    protected function x_flow_ok($active)
253
    {
254
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
255
        $this->send_method_frame(array($class_id, $method_id), $args);
256
    }
257
258
    /**
259
     * @param AMQPReader $reader
260
     * @return bool
261
     */
262
    protected function channel_flow_ok($reader)
263
    {
264
        return $reader->read_bit();
265
    }
266
267
    /**
268
     * @param string $out_of_band
269
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
270
     * @return mixed
271
     */
272 174
    protected function x_open($out_of_band = '')
273
    {
274 174
        if ($this->is_open) {
275
            return null;
276
        }
277
278 174
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
279 174
        $this->send_method_frame(array($class_id, $method_id), $args);
280
281 174
        return $this->wait(array(
282 174
            $this->waitHelper->get_wait('channel.open_ok')
283 174
        ), false, $this->channel_rpc_timeout);
284
    }
285
286
    /**
287
     * @param AMQPReader $reader
288
     */
289 174
    protected function channel_open_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
290
    {
291 174
        $this->is_open = true;
292
293 174
        $this->debug->debug_msg('Channel open');
294 174
    }
295
296
    /**
297
     * Requests an access ticket
298
     *
299
     * @param string $realm
300
     * @param bool $exclusive
301
     * @param bool $passive
302
     * @param bool $active
303
     * @param bool $write
304
     * @param bool $read
305
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
306
     * @return mixed
307
     */
308
    public function access_request(
309
        $realm,
310
        $exclusive = false,
311
        $passive = false,
312
        $active = false,
313
        $write = false,
314
        $read = false
315
    ) {
316
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
317
            $realm,
318
            $exclusive,
319
            $passive,
320
            $active,
321
            $write,
322
            $read
323
        );
324
325
        $this->send_method_frame(array($class_id, $method_id), $args);
326
327
        return $this->wait(array(
328
            $this->waitHelper->get_wait('access.request_ok')
329
        ), false, $this->channel_rpc_timeout);
330
    }
331
332
    /**
333
     * Grants access to server resources
334
     *
335
     * @param AMQPReader $reader
336
     * @return string
337
     */
338
    protected function access_request_ok($reader)
339
    {
340
        $this->default_ticket = $reader->read_short();
341
342
        return $this->default_ticket;
343
    }
344
345
    /**
346
     * Declares exchange
347
     *
348
     * @param string $exchange
349
     * @param string $type
350
     * @param bool $passive
351
     * @param bool $durable
352
     * @param bool $auto_delete
353
     * @param bool $internal
354
     * @param bool $nowait
355
     * @param array $arguments
356
     * @param int $ticket
357
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
358
     * @return mixed|null
359
     */
360 168 View Code Duplication
    public function exchange_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
361
        $exchange,
362
        $type,
363
        $passive = false,
364
        $durable = false,
365
        $auto_delete = true,
366
        $internal = false,
367
        $nowait = false,
368
        $arguments = array(),
369
        $ticket = null
370
    ) {
371 168
        $ticket = $this->getTicket($ticket);
372
373 168
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
374 168
            $ticket,
375 140
            $exchange,
376 140
            $type,
377 140
            $passive,
378 140
            $durable,
379 140
            $auto_delete,
380 140
            $internal,
381 140
            $nowait,
382 56
            $arguments
383 84
        );
384
385 168
        $this->send_method_frame(array($class_id, $method_id), $args);
386
387 144
        if ($nowait) {
388
            return null;
389
        }
390
391 144
        return $this->wait(array(
392 144
            $this->waitHelper->get_wait('exchange.declare_ok')
393 144
        ), false, $this->channel_rpc_timeout);
394
    }
395
396
    /**
397
     * Confirms an exchange declaration
398
     * @param AMQPReader $reader
399
     */
400 138
    protected function exchange_declare_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
401
    {
402 138
    }
403
404
    /**
405
     * Deletes an exchange
406
     *
407
     * @param string $exchange
408
     * @param bool $if_unused
409
     * @param bool $nowait
410
     * @param null $ticket
411
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
412
     * @return mixed|null
413
     */
414 54 View Code Duplication
    public function exchange_delete(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
415
        $exchange,
416
        $if_unused = false,
417
        $nowait = false,
418
        $ticket = null
419
    ) {
420 54
        $ticket = $this->getTicket($ticket);
421 54
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
422 54
            $ticket,
423 45
            $exchange,
424 45
            $if_unused,
425 18
            $nowait
426 27
        );
427
428 54
        $this->send_method_frame(array($class_id, $method_id), $args);
429
430 54
        if ($nowait) {
431
            return null;
432
        }
433
434 54
        return $this->wait(array(
435 54
            $this->waitHelper->get_wait('exchange.delete_ok')
436 54
        ), false, $this->channel_rpc_timeout);
437
    }
438
439
    /**
440
     * Confirms deletion of an exchange
441
     *
442
     * @param AMQPReader $reader
443
     */
444 54
    protected function exchange_delete_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
445
    {
446 54
    }
447
448
    /**
449
     * Binds dest exchange to source exchange
450
     *
451
     * @param string $destination
452
     * @param string $source
453
     * @param string $routing_key
454
     * @param bool $nowait
455
     * @param array $arguments
456
     * @param int $ticket
457
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
458
     * @return mixed|null
459
     */
460
    public function exchange_bind(
461
        $destination,
462
        $source,
463
        $routing_key = '',
464
        $nowait = false,
465
        $arguments = array(),
466
        $ticket = null
467
    ) {
468
        $ticket = $this->getTicket($ticket);
469
470
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
471
            $ticket,
472
            $destination,
473
            $source,
474
            $routing_key,
475
            $nowait,
476
            $arguments
477
        );
478
479
        $this->send_method_frame(array($class_id, $method_id), $args);
480
481
        if ($nowait) {
482
            return null;
483
        }
484
485
        return $this->wait(array(
486
            $this->waitHelper->get_wait('exchange.bind_ok')
487
        ), false, $this->channel_rpc_timeout);
488
    }
489
490
    /**
491
     * Confirms bind successful
492
     * @param AMQPReader $reader
493
     */
494
    protected function exchange_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
495
    {
496
    }
497
498
    /**
499
     * Unbinds dest exchange from source exchange
500
     *
501
     * @param string $destination
502
     * @param string $source
503
     * @param string $routing_key
504
     * @param bool $nowait
505
     * @param array $arguments
506
     * @param int $ticket
507
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
508
     * @return mixed
509
     */
510 View Code Duplication
    public function exchange_unbind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
511
        $destination,
512
        $source,
513
        $routing_key = '',
514
        $nowait = false,
515
        $arguments = array(),
516
        $ticket = null
517
    ) {
518
        $ticket = $this->getTicket($ticket);
519
520
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
521
            $ticket,
522
            $destination,
523
            $source,
524
            $routing_key,
525
            $nowait,
526
            $arguments
527
        );
528
529
        $this->send_method_frame(array($class_id, $method_id), $args);
530
531
        return $this->wait(array(
532
            $this->waitHelper->get_wait('exchange.unbind_ok')
533
        ), false, $this->channel_rpc_timeout);
534
    }
535
536
    /**
537
     * Confirms unbind successful
538
     *
539
     * @param AMQPReader $reader
540
     */
541
    protected function exchange_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
542
    {
543
    }
544
545
    /**
546
     * Binds queue to an exchange
547
     *
548
     * @param string $queue
549
     * @param string $exchange
550
     * @param string $routing_key
551
     * @param bool $nowait
552
     * @param array $arguments
553
     * @param int $ticket
554
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
555
     * @return mixed|null
556
     */
557 138
    public function queue_bind(
558
        $queue,
559
        $exchange,
560
        $routing_key = '',
561
        $nowait = false,
562
        $arguments = array(),
563
        $ticket = null
564
    ) {
565 138
        $ticket = $this->getTicket($ticket);
566
567 138
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
568 138
            $ticket,
569 115
            $queue,
570 115
            $exchange,
571 115
            $routing_key,
572 115
            $nowait,
573 46
            $arguments
574 69
        );
575
576 138
        $this->send_method_frame(array($class_id, $method_id), $args);
577
578 138
        if ($nowait) {
579
            return null;
580
        }
581
582 138
        return $this->wait(array(
583 138
            $this->waitHelper->get_wait('queue.bind_ok')
584 138
        ), false, $this->channel_rpc_timeout);
585
    }
586
587
    /**
588
     * Confirms bind successful
589
     *
590
     * @param AMQPReader $reader
591
     */
592 138
    protected function queue_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
593
    {
594 138
    }
595
596
    /**
597
     * Unbind queue from an exchange
598
     *
599
     * @param string $queue
600
     * @param string $exchange
601
     * @param string $routing_key
602
     * @param array $arguments
603
     * @param int $ticket
604
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
605
     * @return mixed
606
     */
607
    public function queue_unbind(
608
        $queue,
609
        $exchange,
610
        $routing_key = '',
611
        $arguments = array(),
612
        $ticket = null
613
    ) {
614
        $ticket = $this->getTicket($ticket);
615
616
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
617
            $ticket,
618
            $queue,
619
            $exchange,
620
            $routing_key,
621
            $arguments
622
        );
623
624
        $this->send_method_frame(array($class_id, $method_id), $args);
625
626
        return $this->wait(array(
627
            $this->waitHelper->get_wait('queue.unbind_ok')
628
        ), false, $this->channel_rpc_timeout);
629
    }
630
631
    /**
632
     * Confirms unbind successful
633
     * @param AMQPReader $reader
634
     */
635
    protected function queue_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
636
    {
637
    }
638
639
    /**
640
     * Declares queue, creates if needed
641
     *
642
     * @param string $queue
643
     * @param bool $passive
644
     * @param bool $durable
645
     * @param bool $exclusive
646
     * @param bool $auto_delete
647
     * @param bool $nowait
648
     * @param array $arguments
649
     * @param int $ticket
650
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
651
     * @return mixed|null
652
     */
653 150 View Code Duplication
    public function queue_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
654
        $queue = '',
655
        $passive = false,
656
        $durable = false,
657
        $exclusive = false,
658
        $auto_delete = true,
659
        $nowait = false,
660
        $arguments = array(),
661
        $ticket = null
662
    ) {
663 150
        $ticket = $this->getTicket($ticket);
664
665 150
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
666 150
            $ticket,
667 125
            $queue,
668 125
            $passive,
669 125
            $durable,
670 125
            $exclusive,
671 125
            $auto_delete,
672 125
            $nowait,
673 50
            $arguments
674 75
        );
675
676 150
        $this->send_method_frame(array($class_id, $method_id), $args);
677
678 150
        if ($nowait) {
679
            return null;
680
        }
681
682 150
        return $this->wait(array(
683 150
            $this->waitHelper->get_wait('queue.declare_ok')
684 150
        ), false, $this->channel_rpc_timeout);
685
    }
686
687
    /**
688
     * Confirms a queue definition
689
     *
690
     * @param AMQPReader $reader
691
     * @return string[]
692
     */
693 144
    protected function queue_declare_ok($reader)
694
    {
695 144
        $queue = $reader->read_shortstr();
696 144
        $message_count = $reader->read_long();
697 144
        $consumer_count = $reader->read_long();
698
699 144
        return array($queue, $message_count, $consumer_count);
700
    }
701
702
    /**
703
     * Deletes a queue
704
     *
705
     * @param string $queue
706
     * @param bool $if_unused
707
     * @param bool $if_empty
708
     * @param bool $nowait
709
     * @param int $ticket
710
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
711
     * @return mixed|null
712
     */
713 36
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
714
    {
715 36
        $ticket = $this->getTicket($ticket);
716
717 36
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
718 36
            $ticket,
719 30
            $queue,
720 30
            $if_unused,
721 30
            $if_empty,
722 12
            $nowait
723 18
        );
724
725 36
        $this->send_method_frame(array($class_id, $method_id), $args);
726
727 36
        if ($nowait) {
728
            return null;
729
        }
730
731 36
        return $this->wait(array(
732 36
            $this->waitHelper->get_wait('queue.delete_ok')
733 36
        ), false, $this->channel_rpc_timeout);
734
    }
735
736
    /**
737
     * Confirms deletion of a queue
738
     *
739
     * @param AMQPReader $reader
740
     * @return string
741
     */
742 36
    protected function queue_delete_ok($reader)
743
    {
744 36
        return $reader->read_long();
745
    }
746
747
    /**
748
     * Purges a queue
749
     *
750
     * @param string $queue
751
     * @param bool $nowait
752
     * @param int $ticket
753
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
754
     * @return mixed|null
755
     */
756 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
757
    {
758
        $ticket = $this->getTicket($ticket);
759
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
760
761
        $this->send_method_frame(array($class_id, $method_id), $args);
762
763
        if ($nowait) {
764
            return null;
765
        }
766
767
        return $this->wait(array(
768
            $this->waitHelper->get_wait('queue.purge_ok')
769
        ), false, $this->channel_rpc_timeout);
770
    }
771
772
    /**
773
     * Confirms a queue purge
774
     *
775
     * @param AMQPReader $reader
776
     * @return string
777
     */
778
    protected function queue_purge_ok($reader)
779
    {
780
        return $reader->read_long();
781
    }
782
783
    /**
784
     * Acknowledges one or more messages
785
     *
786
     * @param string $delivery_tag
787
     * @param bool $multiple
788
     */
789 6
    public function basic_ack($delivery_tag, $multiple = false)
790
    {
791 6
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
792 6
        $this->send_method_frame(array($class_id, $method_id), $args);
793 6
    }
794
795
    /**
796
     * Called when the server sends a basic.ack
797
     *
798
     * @param AMQPReader $reader
799
     * @throws AMQPRuntimeException
800
     */
801 6 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
802
    {
803 6
        $delivery_tag = $reader->read_longlong();
804 6
        $multiple = (bool) $reader->read_bit();
805
806 6
        if (!isset($this->published_messages[$delivery_tag])) {
807
            throw new AMQPRuntimeException(sprintf(
808
                'Server ack\'ed unknown delivery_tag "%s"',
809
                $delivery_tag
810
            ));
811
        }
812
813 6
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
814 6
    }
815
816
    /**
817
     * Called when the server sends a basic.nack
818
     *
819
     * @param AMQPReader $reader
820
     * @throws AMQPRuntimeException
821
     */
822 View Code Duplication
    protected function basic_nack_from_server($reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
823
    {
824
        $delivery_tag = $reader->read_longlong();
825
        $multiple = (bool) $reader->read_bit();
826
827
        if (!isset($this->published_messages[$delivery_tag])) {
828
            throw new AMQPRuntimeException(sprintf(
829
                'Server nack\'ed unknown delivery_tag "%s"',
830
                $delivery_tag
831
            ));
832
        }
833
834
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
835
    }
836
837
    /**
838
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
839
     *
840
     * @param string $delivery_tag
841
     * @param bool $multiple
842
     * @param callable $handler
843
     */
844 6
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
845
    {
846 6
        if ($multiple) {
847
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
848
849
            foreach ($keys as $key) {
850
                $this->internal_ack_handler($key, false, $handler);
851
            }
852
853
        } else {
854 6
            $message = $this->get_and_unset_message($delivery_tag);
855 6
            $message->delivery_info['delivery_tag'] = $delivery_tag;
856 6
            $this->dispatch_to_handler($handler, array($message));
857
        }
858 6
    }
859
860
    /**
861
     * @param AMQPMessage[] $messages
862
     * @param string $value
863
     * @return mixed
864
     */
865
    protected function get_keys_less_or_equal(array $messages, $value)
866
    {
867
        $keys = array_reduce(
868
            array_keys($messages),
869
870
            /**
871
             * @param string $key
872
             */
873
            function ($keys, $key) use ($value) {
874
                if (bccomp($key, $value, 0) <= 0) {
875
                    $keys[] = $key;
876
                }
877
878
                return $keys;
879
            },
880
            array()
881
        );
882
883
        return $keys;
884
    }
885
886
    /**
887
     * Rejects one or several received messages
888
     *
889
     * @param string $delivery_tag
890
     * @param bool $multiple
891
     * @param bool $requeue
892
     */
893
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
894
    {
895
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
896
        $this->send_method_frame(array($class_id, $method_id), $args);
897
    }
898
899
    /**
900
     * Ends a queue consumer
901
     *
902
     * @param string $consumer_tag
903
     * @param bool $nowait
904
     * @param bool $noreturn
905
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
906
     * @return mixed
907
     */
908 18 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
909
    {
910 18
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
911 18
        $this->send_method_frame(array($class_id, $method_id), $args);
912
913 18
        if ($nowait || $noreturn) {
914
            unset($this->callbacks[$consumer_tag]);
915
            return $consumer_tag;
916
        }
917
918 18
        return $this->wait(array(
919 18
            $this->waitHelper->get_wait('basic.cancel_ok')
920 18
        ), false, $this->channel_rpc_timeout);
921
    }
922
923
    /**
924
     * @param AMQPReader $reader
925
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
926
     */
927
    protected function basic_cancel_from_server(AMQPReader $reader)
928
    {
929
        throw new AMQPBasicCancelException($reader->read_shortstr());
930
    }
931
932
    /**
933
     * Confirm a cancelled consumer
934
     *
935
     * @param AMQPReader $reader
936
     * @return string
937
     */
938 18
    protected function basic_cancel_ok($reader)
939
    {
940 18
        $consumerTag = $reader->read_shortstr();
941 18
        unset($this->callbacks[$consumerTag]);
942
943 18
        return $consumerTag;
944
    }
945
946
    /**
947
     * @return bool
948
     */
949
    public function is_consuming()
950
    {
951
        return !empty($this->callbacks);
952
    }
953
954
    /**
955
     * Starts a queue consumer
956
     *
957
     * @param string $queue
958
     * @param string $consumer_tag
959
     * @param bool $no_local
960
     * @param bool $no_ack
961
     * @param bool $exclusive
962
     * @param bool $nowait
963
     * @param callable|null $callback
964
     * @param int|null $ticket
965
     * @param array $arguments
966
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
967
     * @return mixed|string
968
     */
969 24
    public function basic_consume(
970
        $queue = '',
971
        $consumer_tag = '',
972
        $no_local = false,
973
        $no_ack = false,
974
        $exclusive = false,
975
        $nowait = false,
976
        $callback = null,
977
        $ticket = null,
978
        $arguments = array()
979
    ) {
980 24
        $ticket = $this->getTicket($ticket);
981 24
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
982 24
            $ticket,
983 20
            $queue,
984 20
            $consumer_tag,
985 20
            $no_local,
986 20
            $no_ack,
987 20
            $exclusive,
988 20
            $nowait,
989 24
            $this->protocolVersion == '0.9.1' ? $arguments : null
990 12
        );
991
992 24
        $this->send_method_frame(array($class_id, $method_id), $args);
993
994 24
        if (false === $nowait) {
995 24
            $consumer_tag = $this->wait(array(
996 24
                $this->waitHelper->get_wait('basic.consume_ok')
997 24
            ), false, $this->channel_rpc_timeout);
998 12
        }
999
1000 24
        $this->callbacks[$consumer_tag] = $callback;
1001
1002 24
        return $consumer_tag;
1003
    }
1004
1005
    /**
1006
     * Confirms a new consumer
1007
     *
1008
     * @param AMQPReader $reader
1009
     * @return string
1010
     */
1011 24
    protected function basic_consume_ok($reader)
1012
    {
1013 24
        return $reader->read_shortstr();
1014
    }
1015
1016
    /**
1017
     * Notifies the client of a consumer message
1018
     *
1019
     * @param AMQPReader $reader
1020
     * @param AMQPMessage $message
1021
     */
1022 24
    protected function basic_deliver($reader, $message)
1023
    {
1024 24
        $consumer_tag = $reader->read_shortstr();
1025 24
        $delivery_tag = $reader->read_longlong();
1026 24
        $redelivered = $reader->read_bit();
1027 24
        $exchange = $reader->read_shortstr();
1028 24
        $routing_key = $reader->read_shortstr();
1029
1030 24
        $message->delivery_info = array(
1031 24
            'channel' => $this,
1032 24
            'consumer_tag' => $consumer_tag,
1033 24
            'delivery_tag' => $delivery_tag,
1034 24
            'redelivered' => $redelivered,
1035 24
            'exchange' => $exchange,
1036 12
            'routing_key' => $routing_key
1037 12
        );
1038
1039 24
        if (isset($this->callbacks[$consumer_tag])) {
1040 24
            call_user_func($this->callbacks[$consumer_tag], $message);
1041 9
        }
1042 18
    }
1043
1044
    /**
1045
     * Direct access to a queue if no message was available in the queue, return null
1046
     *
1047
     * @param string $queue
1048
     * @param bool $no_ack
1049
     * @param int $ticket
1050
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1051
     * @return mixed
1052
     */
1053 42
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
1054
    {
1055 42
        $ticket = $this->getTicket($ticket);
1056 42
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1057
1058 42
        $this->send_method_frame(array($class_id, $method_id), $args);
1059
1060 42
        return $this->wait(array(
1061 42
            $this->waitHelper->get_wait('basic.get_ok'),
1062 42
            $this->waitHelper->get_wait('basic.get_empty')
1063 42
        ), false, $this->channel_rpc_timeout);
1064
    }
1065
1066
    /**
1067
     * Indicates no messages available
1068
     *
1069
     * @param AMQPReader $reader
1070
     */
1071
    protected function basic_get_empty($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1072
    {
1073
    }
1074
1075
    /**
1076
     * Provides client with a message
1077
     *
1078
     * @param AMQPReader $reader
1079
     * @param AMQPMessage $message
1080
     * @return AMQPMessage
1081
     */
1082 42
    protected function basic_get_ok($reader, $message)
1083
    {
1084 42
        $delivery_tag = $reader->read_longlong();
1085 42
        $redelivered = $reader->read_bit();
1086 42
        $exchange = $reader->read_shortstr();
1087 42
        $routing_key = $reader->read_shortstr();
1088 42
        $message_count = $reader->read_long();
1089
1090 42
        $message->delivery_info = array(
1091 42
            'delivery_tag' => $delivery_tag,
1092 42
            'redelivered' => $redelivered,
1093 42
            'exchange' => $exchange,
1094 42
            'routing_key' => $routing_key,
1095 21
            'message_count' => $message_count
1096 21
        );
1097
1098 42
        return $message;
1099
    }
1100
1101
    /**
1102
     * @param string $exchange
1103
     * @param string $routing_key
1104
     * @param $mandatory
1105
     * @param $immediate
1106
     * @param int $ticket
1107
     * @return mixed
1108
     */
1109 66
    private function pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1110
    {
1111 66
        $cache_key = sprintf(
1112 66
            '%s|%s|%s|%s|%s',
1113 55
            $exchange,
1114 55
            $routing_key,
1115 55
            $mandatory,
1116 55
            $immediate,
1117 22
            $ticket
1118 33
        );
1119 66
        if (false === isset($this->publish_cache[$cache_key])) {
1120 66
            $ticket = $this->getTicket($ticket);
1121 66
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1122 66
                $ticket,
1123 55
                $exchange,
1124 55
                $routing_key,
1125 55
                $mandatory,
1126 22
                $immediate
1127 33
            );
1128
1129 66
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1130 66
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1131 66
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1132
                reset($this->publish_cache);
1133
                $old_key = key($this->publish_cache);
1134
                unset($this->publish_cache[$old_key]);
1135
            }
1136 33
        }
1137
1138 66
        return $this->publish_cache[$cache_key];
1139
    }
1140
1141
    /**
1142
     * Publishes a message
1143
     *
1144
     * @param AMQPMessage $msg
1145
     * @param string $exchange
1146
     * @param string $routing_key
1147
     * @param bool $mandatory
1148
     * @param bool $immediate
1149
     * @param int $ticket
1150
     */
1151 66
    public function basic_publish(
1152
        $msg,
1153
        $exchange = '',
1154
        $routing_key = '',
1155
        $mandatory = false,
1156
        $immediate = false,
1157
        $ticket = null
1158
    ) {
1159 66
        $pkt = new AMQPWriter();
1160 66
        $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1161
1162
        try {
1163 66
            $this->connection->send_content(
1164 66
                $this->channel_id,
1165 66
                60,
1166 66
                0,
1167 66
                mb_strlen($msg->body, 'ASCII'),
1168 66
                $msg->serialize_properties(),
1169 66
                $msg->body,
1170 22
                $pkt
1171 33
            );
1172 33
        } catch (AMQPConnectionClosedException $e) {
1173
            $this->do_close();
1174
            throw $e;
1175
        }
1176
1177 66 View Code Duplication
        if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1178 6
            $this->published_messages[$this->next_delivery_tag] = $msg;
1179 6
            $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1180 3
        }
1181 66
    }
1182
1183
    /**
1184
     * @param AMQPMessage $msg
1185
     * @param string $exchange
1186
     * @param string $routing_key
1187
     * @param bool $mandatory
1188
     * @param bool $immediate
1189
     * @param int $ticket
1190
     */
1191
    public function batch_basic_publish(
1192
        $msg,
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1193
        $exchange = '',
0 ignored issues
show
Unused Code introduced by
The parameter $exchange is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1194
        $routing_key = '',
0 ignored issues
show
Unused Code introduced by
The parameter $routing_key is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1195
        $mandatory = false,
0 ignored issues
show
Unused Code introduced by
The parameter $mandatory is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1196
        $immediate = false,
0 ignored issues
show
Unused Code introduced by
The parameter $immediate is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1197
        $ticket = null
0 ignored issues
show
Unused Code introduced by
The parameter $ticket is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1198
    ) {
1199
        $this->batch_messages[] = func_get_args();
1200
    }
1201
1202
    /**
1203
     * Publish batch
1204
     *
1205
     * @return void
1206
     */
1207
    public function publish_batch()
1208
    {
1209
        if (empty($this->batch_messages)) {
1210
            return null;
1211
        }
1212
1213
        /** @var AMQPWriter $pkt */
1214
        $pkt = new AMQPWriter();
1215
1216
        /** @var AMQPMessage $msg */
1217
        foreach ($this->batch_messages as $m) {
1218
            $msg = $m[0];
1219
1220
            $exchange = isset($m[1]) ? $m[1] : '';
1221
            $routing_key = isset($m[2]) ? $m[2] : '';
1222
            $mandatory = isset($m[3]) ? $m[3] : false;
1223
            $immediate = isset($m[4]) ? $m[4] : false;
1224
            $ticket = isset($m[5]) ? $m[5] : null;
1225
            $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1226
1227
            $this->connection->prepare_content(
1228
                $this->channel_id,
1229
                60,
1230
                0,
1231
                mb_strlen($msg->body, 'ASCII'),
1232
                $msg->serialize_properties(),
1233
                $msg->body,
1234
                $pkt
1235
            );
1236
1237 View Code Duplication
            if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1238
                $this->published_messages[$this->next_delivery_tag] = $msg;
1239
                $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1240
            }
1241
        }
1242
1243
        //call write here
1244
        $this->connection->write($pkt->getvalue());
1245
        $this->batch_messages = array();
1246
    }
1247
1248
    /**
1249
     * Specifies QoS
1250
     *
1251
     * @param int $prefetch_size
1252
     * @param int $prefetch_count
1253
     * @param bool $a_global
1254
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1255
     * @return mixed
1256
     */
1257 View Code Duplication
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1258
    {
1259
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1260
            $prefetch_size,
1261
            $prefetch_count,
1262
            $a_global
1263
        );
1264
1265
        $this->send_method_frame(array($class_id, $method_id), $args);
1266
1267
        return $this->wait(array(
1268
            $this->waitHelper->get_wait('basic.qos_ok')
1269
        ), false, $this->channel_rpc_timeout);
1270
    }
1271
1272
    /**
1273
     * Confirms QoS request
1274
     * @param AMQPReader $reader
1275
     */
1276
    protected function basic_qos_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1277
    {
1278
    }
1279
1280
    /**
1281
     * Redelivers unacknowledged messages
1282
     *
1283
     * @param bool $requeue
1284
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1285
     * @return mixed
1286
     */
1287 View Code Duplication
    public function basic_recover($requeue = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1288
    {
1289
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1290
        $this->send_method_frame(array($class_id, $method_id), $args);
1291
1292
        return $this->wait(array(
1293
            $this->waitHelper->get_wait('basic.recover_ok')
1294
        ), false, $this->channel_rpc_timeout);
1295
    }
1296
1297
    /**
1298
     * Confirm the requested recover
1299
     * @param AMQPReader $reader
1300
     */
1301
    protected function basic_recover_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1302
    {
1303
    }
1304
1305
    /**
1306
     * Rejects an incoming message
1307
     *
1308
     * @param string $delivery_tag
1309
     * @param bool $requeue
1310
     */
1311
    public function basic_reject($delivery_tag, $requeue)
1312
    {
1313
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1314
        $this->send_method_frame(array($class_id, $method_id), $args);
1315
    }
1316
1317
    /**
1318
     * Returns a failed message
1319
     *
1320
     * @param AMQPReader $reader
1321
     * @param AMQPMessage $message
1322
     * @return null
1323
     */
1324
    protected function basic_return($reader, $message)
1325
    {
1326
        $callback = $this->basic_return_callback;
1327
        if (!is_callable($callback)) {
1328
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1329
            return null;
1330
        }
1331
1332
        $reply_code = $reader->read_short();
1333
        $reply_text = $reader->read_shortstr();
1334
        $exchange = $reader->read_shortstr();
1335
        $routing_key = $reader->read_shortstr();
1336
1337
        call_user_func_array($callback, array(
1338
            $reply_code,
1339
            $reply_text,
1340
            $exchange,
1341
            $routing_key,
1342
            $message,
1343
        ));
1344
    }
1345
1346
    /**
1347
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1348
     * @return mixed
1349
     */
1350 View Code Duplication
    public function tx_commit()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1351
    {
1352
        $this->send_method_frame(array(90, 20));
1353
1354
        return $this->wait(array(
1355
            $this->waitHelper->get_wait('tx.commit_ok')
1356
        ), false, $this->channel_rpc_timeout);
1357
    }
1358
1359
    /**
1360
     * Confirms a successful commit
1361
     * @param AMQPReader $reader
1362
     */
1363
    protected function tx_commit_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1364
    {
1365
    }
1366
1367
    /**
1368
     * Rollbacks the current transaction
1369
     *
1370
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1371
     * @return mixed
1372
     */
1373 View Code Duplication
    public function tx_rollback()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1374
    {
1375
        $this->send_method_frame(array(90, 30));
1376
1377
        return $this->wait(array(
1378
            $this->waitHelper->get_wait('tx.rollback_ok')
1379
        ), false, $this->channel_rpc_timeout);
1380
    }
1381
1382
    /**
1383
     * Confirms a successful rollback
1384
     *
1385
     * @param AMQPReader $reader
1386
     */
1387
    protected function tx_rollback_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1388
    {
1389
    }
1390
1391
    /**
1392
     * Puts the channel into confirm mode
1393
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1394
     *
1395
     * @param bool $nowait
1396
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1397
     * @return null
1398
     */
1399 12
    public function confirm_select($nowait = false)
1400
    {
1401 12
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
1402
1403 12
        $this->send_method_frame(array($class_id, $method_id), $args);
1404
1405 12
        if ($nowait) {
1406
            return null;
1407
        }
1408
1409 12
        $this->wait(array(
1410 12
            $this->waitHelper->get_wait('confirm.select_ok')
1411 12
        ), false, $this->channel_rpc_timeout);
1412 6
        $this->next_delivery_tag = 1;
1413 6
    }
1414
1415
    /**
1416
     * Confirms a selection
1417
     *
1418
     * @param AMQPReader $reader
1419
     */
1420 6
    public function confirm_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1421
    {
1422 6
    }
1423
1424
    /**
1425
     * Waits for pending acks and nacks from the server.
1426
     * If there are no pending acks, the method returns immediately
1427
     *
1428
     * @param int $timeout Waits until $timeout value is reached
1429
     */
1430 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1431
    {
1432
        $functions = array(
1433
            $this->waitHelper->get_wait('basic.ack'),
1434
            $this->waitHelper->get_wait('basic.nack'),
1435
        );
1436
1437
        while (count($this->published_messages) !== 0) {
1438
            if ($timeout > 0) {
1439
                $this->wait($functions, true, $timeout);
1440
            } else {
1441
                $this->wait($functions);
1442
            }
1443
        }
1444
    }
1445
1446
    /**
1447
     * Waits for pending acks, nacks and returns from the server. If there are no pending acks, the method returns immediately.
1448
     *
1449
     * @param int $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1450
     */
1451 6 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1452
    {
1453
        $functions = array(
1454 6
            $this->waitHelper->get_wait('basic.ack'),
1455 6
            $this->waitHelper->get_wait('basic.nack'),
1456 6
            $this->waitHelper->get_wait('basic.return'),
1457 3
        );
1458
1459 6
        while (count($this->published_messages) !== 0) {
1460 6
            if ($timeout > 0) {
1461 6
                $this->wait($functions, true, $timeout);
1462 3
            } else {
1463
                $this->wait($functions);
1464
            }
1465 3
        }
1466 6
    }
1467
1468
    /**
1469
     * Selects standard transaction mode
1470
     *
1471
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1472
     * @return mixed
1473
     */
1474 View Code Duplication
    public function tx_select()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1475
    {
1476
        $this->send_method_frame(array(90, 10));
1477
1478
        return $this->wait(array(
1479
            $this->waitHelper->get_wait('tx.select_ok')
1480
        ), false, $this->channel_rpc_timeout);
1481
    }
1482
1483
    /**
1484
     * Confirms transaction mode
1485
     * @param AMQPReader $reader
1486
     */
1487
    protected function tx_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1488
    {
1489
    }
1490
1491
    /**
1492
     * @param array $arguments
1493
     * @return array
1494
     */
1495
    protected function getArguments($arguments)
1496
    {
1497
        @trigger_error(sprintf(
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
1498
            'Method "%s" is deprecated, please use an array as a default argument instead',
1499
            __METHOD__
1500
        ), E_USER_DEPRECATED);
1501
        return (null === $arguments) ? array() : $arguments;
1502
    }
1503
1504
    /**
1505
     * @param int $ticket
1506
     * @return int
1507
     */
1508 168
    protected function getTicket($ticket)
1509
    {
1510 168
        return (null === $ticket) ? $this->default_ticket : $ticket;
1511
    }
1512
1513
    /**
1514
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1515
     *
1516
     * @param int $index
1517
     * @return AMQPMessage
1518
     */
1519 6
    protected function get_and_unset_message($index)
1520
    {
1521 6
        $message = $this->published_messages[$index];
1522 6
        unset($this->published_messages[$index]);
1523
1524 6
        return $message;
1525
    }
1526
1527
    /**
1528
     * Sets callback for basic_return
1529
     *
1530
     * @param  callable $callback
1531
     * @throws \InvalidArgumentException if $callback is not callable
1532
     */
1533 View Code Duplication
    public function set_return_listener($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1534
    {
1535
        if (!is_callable($callback)) {
1536
            throw new \InvalidArgumentException(sprintf(
1537
                'Given callback "%s" should be callable. %s type was given.',
1538
                $callback,
1539
                gettype($callback)
1540
            ));
1541
        }
1542
1543
        $this->basic_return_callback = $callback;
1544
    }
1545
1546
    /**
1547
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1548
     *
1549
     * @param callable $callback
1550
     * @throws \InvalidArgumentException
1551
     */
1552 View Code Duplication
    public function set_nack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1553
    {
1554
        if (!is_callable($callback)) {
1555
            throw new \InvalidArgumentException(sprintf(
1556
                'Given callback "%s" should be callable. %s type was given.',
1557
                $callback,
1558
                gettype($callback)
1559
            ));
1560
        }
1561
1562
        $this->nack_handler = $callback;
1563
    }
1564
1565
    /**
1566
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1567
     *
1568
     * @param callable $callback
1569
     * @throws \InvalidArgumentException
1570
     */
1571 6 View Code Duplication
    public function set_ack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1572
    {
1573 6
        if (!is_callable($callback)) {
1574
            throw new \InvalidArgumentException(sprintf(
1575
                'Given callback "%s" should be callable. %s type was given.',
1576
                $callback,
1577
                gettype($callback)
1578
            ));
1579
        }
1580
1581 6
        $this->ack_handler = $callback;
1582 6
    }
1583
}
1584