Completed
Push — master ( 5b578b...ff1afe )
by
unknown
06:14
created

AMQPChannel::is_consuming()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Exception\AMQPBasicCancelException;
5
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
6
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
7
use PhpAmqpLib\Exception\AMQPRuntimeException;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPWriter;
11
12
class AMQPChannel extends AbstractChannel
13
{
14
    /**
15
     * @var callable[]
16
     * @internal Use is_consuming() to check if there is active callbacks
17
     */
18
    public $callbacks = array();
19
20
    /** @var bool Whether or not the channel has been "opened" */
21
    protected $is_open = false;
22
23
    /** @var int */
24
    protected $default_ticket = 0;
25
26
    /** @var bool */
27
    protected $active = true;
28
29
    /** @var array */
30
    protected $alerts = array();
31
32
    /** @var bool */
33
    protected $auto_decode;
34
35
    /**
36
     * These parameters will be passed to function in case of basic_return:
37
     *    param int $reply_code
38
     *    param string $reply_text
39
     *    param string $exchange
40
     *    param string $routing_key
41
     *    param AMQPMessage $msg
42
     *
43
     * @var callable
44
     */
45
    protected $basic_return_callback;
46
47
    /** @var array Used to keep track of the messages that are going to be batch published. */
48
    protected $batch_messages = array();
49
50
    /**
51
     * If the channel is in confirm_publish mode this array will store all published messages
52
     * until they get ack'ed or nack'ed
53
     *
54
     * @var AMQPMessage[]
55
     */
56
    private $published_messages = array();
57
58
    /** @var int */
59
    private $next_delivery_tag = 0;
60
61
    /** @var callable */
62
    private $ack_handler;
63
64
    /** @var callable */
65
    private $nack_handler;
66
67
    /**
68
     * Circular buffer to speed up both basic_publish() and publish_batch().
69
     * Max size limited by $publish_cache_max_size.
70
     *
71
     * @var array
72
     * @see basic_publish()
73
     * @see publish_batch()
74
     */
75
    private $publish_cache = array();
76
77
    /**
78
     * Maximal size of $publish_cache
79
     *
80
     * @var int
81
     */
82
    private $publish_cache_max_size = 100;
83
84
    /**
85
     * Maximum time to wait for operations on this channel, in seconds.
86
     * @var float $channel_rpc_timeout
87
     */
88
    private $channel_rpc_timeout;
89
90
    /**
91
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
92
     * @param null $channel_id
93
     * @param bool $auto_decode
94
     * @param int $channel_rpc_timeout
95
     * @throws \Exception
96
     */
97 174
    public function __construct($connection, $channel_id = null, $auto_decode = true, $channel_rpc_timeout = 0)
98
    {
99 174
        if ($channel_id == null) {
100
            $channel_id = $connection->get_free_channel_id();
101
        }
102
103 174
        parent::__construct($connection, $channel_id);
104
105 174
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
106
107 174
        $this->auto_decode = $auto_decode;
108 174
        $this->channel_rpc_timeout = $channel_rpc_timeout;
0 ignored issues
show
Documentation Bug introduced by
The property $channel_rpc_timeout was declared of type double, but $channel_rpc_timeout is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
109
110
        try {
111 174
            $this->x_open();
112 87
        } catch (\Exception $e) {
113
            $this->close();
114
            throw $e;
115
        }
116 174
    }
117
118
    /**
119
     * @return bool
120
     */
121
    public function is_open()
122
    {
123
        return $this->is_open;
124
    }
125
126
    /**
127
     * Tear down this object, after we've agreed to close with the server.
128
     */
129 150
    protected function do_close()
130
    {
131 150
        if ($this->channel_id !== null) {
132 150
            unset($this->connection->channels[$this->channel_id]);
133 75
        }
134 150
        $this->channel_id = $this->connection = null;
135 150
        $this->is_open = false;
136 150
        $this->callbacks = array();
137 150
    }
138
139
    /**
140
     * Only for AMQP0.8.0
141
     * This method allows the server to send a non-fatal warning to
142
     * the client.  This is used for methods that are normally
143
     * asynchronous and thus do not have confirmations, and for which
144
     * the server may detect errors that need to be reported.  Fatal
145
     * errors are handled as channel or connection exceptions; non-
146
     * fatal errors are sent through this method.
147
     *
148
     * @param AMQPReader $reader
149
     */
150
    protected function channel_alert($reader)
151
    {
152
        $reply_code = $reader->read_short();
153
        $reply_text = $reader->read_shortstr();
154
        $details = $reader->read_table();
155
        array_push($this->alerts, array($reply_code, $reply_text, $details));
156
    }
157
158
    /**
159
     * Request a channel close
160
     *
161
     * @param int $reply_code
162
     * @param string $reply_text
163
     * @param array $method_sig
164
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
165
     * @return mixed
166
     */
167 150
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
168
    {
169 150
        $this->callbacks = array();
170 150
        if ($this->is_open === false || $this->connection === null) {
171 30
            $this->do_close();
172
173 30
            return null; // already closed
174
        }
175 138
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
176 138
            $reply_code,
177 138
            $reply_text,
178 138
            $method_sig[0],
179 138
            $method_sig[1]
180 69
        );
181
182
        try {
183 138
            $this->send_method_frame(array($class_id, $method_id), $args);
184 69
        } catch (\Exception $e) {
185
            $this->do_close();
186
187
            throw $e;
188
        }
189
190 138
        return $this->wait(array(
191 138
            $this->waitHelper->get_wait('channel.close_ok')
192 138
        ), false, $this->channel_rpc_timeout );
193
    }
194
195
    /**
196
     * @param AMQPReader $reader
197
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
198
     */
199 18
    protected function channel_close($reader)
200
    {
201 18
        $reply_code = $reader->read_short();
202 18
        $reply_text = $reader->read_shortstr();
203 18
        $class_id = $reader->read_short();
204 18
        $method_id = $reader->read_short();
205
206 18
        $this->send_method_frame(array(20, 41));
207 18
        $this->do_close();
208
209 18
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
210
    }
211
212
    /**
213
     * Confirm a channel close
214
     * Alias of AMQPChannel::do_close()
215
     *
216
     * @param AMQPReader $reader
217
     */
218 138
    protected function channel_close_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
219
    {
220 138
        $this->do_close();
221 138
    }
222
223
    /**
224
     * Enables/disables flow from peer
225
     *
226
     * @param $active
227
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
228
     * @return mixed
229
     */
230
    public function flow($active)
231
    {
232
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
233
        $this->send_method_frame(array($class_id, $method_id), $args);
234
235
        return $this->wait(array(
236
            $this->waitHelper->get_wait('channel.flow_ok')
237
        ), false, $this->channel_rpc_timeout);
238
    }
239
240
    /**
241
     * @param AMQPReader $reader
242
     */
243
    protected function channel_flow($reader)
244
    {
245
        $this->active = $reader->read_bit();
246
        $this->x_flow_ok($this->active);
247
    }
248
249
    /**
250
     * @param bool $active
251
     */
252
    protected function x_flow_ok($active)
253
    {
254
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
255
        $this->send_method_frame(array($class_id, $method_id), $args);
256
    }
257
258
    /**
259
     * @param AMQPReader $reader
260
     * @return bool
261
     */
262
    protected function channel_flow_ok($reader)
263
    {
264
        return $reader->read_bit();
265
    }
266
267
    /**
268
     * @param string $out_of_band
269
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
270
     * @return mixed
271
     */
272 174
    protected function x_open($out_of_band = '')
273
    {
274 174
        if ($this->is_open) {
275
            return null;
276
        }
277
278 174
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
279 174
        $this->send_method_frame(array($class_id, $method_id), $args);
280
281 174
        return $this->wait(array(
282 174
            $this->waitHelper->get_wait('channel.open_ok')
283 174
        ), false, $this->channel_rpc_timeout);
284
    }
285
286
    /**
287
     * @param AMQPReader $reader
288
     */
289 174
    protected function channel_open_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
290
    {
291 174
        $this->is_open = true;
292
293 174
        $this->debug->debug_msg('Channel open');
294 174
    }
295
296
    /**
297
     * Requests an access ticket
298
     *
299
     * @param string $realm
300
     * @param bool $exclusive
301
     * @param bool $passive
302
     * @param bool $active
303
     * @param bool $write
304
     * @param bool $read
305
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
306
     * @return mixed
307
     */
308
    public function access_request(
309
        $realm,
310
        $exclusive = false,
311
        $passive = false,
312
        $active = false,
313
        $write = false,
314
        $read = false
315
    ) {
316
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
317
            $realm,
318
            $exclusive,
319
            $passive,
320
            $active,
321
            $write,
322
            $read
323
        );
324
325
        $this->send_method_frame(array($class_id, $method_id), $args);
326
327
        return $this->wait(array(
328
            $this->waitHelper->get_wait('access.request_ok')
329
        ), false, $this->channel_rpc_timeout);
330
    }
331
332
    /**
333
     * Grants access to server resources
334
     *
335
     * @param AMQPReader $reader
336
     * @return string
337
     */
338
    protected function access_request_ok($reader)
339
    {
340
        $this->default_ticket = $reader->read_short();
341
342
        return $this->default_ticket;
343
    }
344
345
    /**
346
     * Declares exchange
347
     *
348
     * @param string $exchange
349
     * @param string $type
350
     * @param bool $passive
351
     * @param bool $durable
352
     * @param bool $auto_delete
353
     * @param bool $internal
354
     * @param bool $nowait
355
     * @param array $arguments
356
     * @param int $ticket
357
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
358
     * @return mixed|null
359
     */
360 168 View Code Duplication
    public function exchange_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
361
        $exchange,
362
        $type,
363
        $passive = false,
364
        $durable = false,
365
        $auto_delete = true,
366
        $internal = false,
367
        $nowait = false,
368
        $arguments = array(),
369
        $ticket = null
370
    ) {
371 168
        $ticket = $this->getTicket($ticket);
372
373 168
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
374 168
            $ticket,
375 168
            $exchange,
376 168
            $type,
377 168
            $passive,
378 168
            $durable,
379 168
            $auto_delete,
380 168
            $internal,
381 168
            $nowait,
382 84
            $arguments
383 84
        );
384
385 168
        $this->send_method_frame(array($class_id, $method_id), $args);
386
387 156
        if ($nowait) {
388
            return null;
389
        }
390
391 156
        return $this->wait(array(
392 156
            $this->waitHelper->get_wait('exchange.declare_ok')
393 156
        ), false, $this->channel_rpc_timeout);
394
    }
395
396
    /**
397
     * Confirms an exchange declaration
398
     * @param AMQPReader $reader
399
     */
400 138
    protected function exchange_declare_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
401
    {
402 138
    }
403
404
    /**
405
     * Deletes an exchange
406
     *
407
     * @param string $exchange
408
     * @param bool $if_unused
409
     * @param bool $nowait
410
     * @param null $ticket
411
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
412
     * @return mixed|null
413
     */
414 54 View Code Duplication
    public function exchange_delete(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
415
        $exchange,
416
        $if_unused = false,
417
        $nowait = false,
418
        $ticket = null
419
    ) {
420 54
        $ticket = $this->getTicket($ticket);
421 54
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
422 54
            $ticket,
423 54
            $exchange,
424 54
            $if_unused,
425 27
            $nowait
426 27
        );
427
428 54
        $this->send_method_frame(array($class_id, $method_id), $args);
429
430 54
        if ($nowait) {
431
            return null;
432
        }
433
434 54
        return $this->wait(array(
435 54
            $this->waitHelper->get_wait('exchange.delete_ok')
436 54
        ), false, $this->channel_rpc_timeout);
437
    }
438
439
    /**
440
     * Confirms deletion of an exchange
441
     *
442
     * @param AMQPReader $reader
443
     */
444 54
    protected function exchange_delete_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
445
    {
446 54
    }
447
448
    /**
449
     * Binds dest exchange to source exchange
450
     *
451
     * @param string $destination
452
     * @param string $source
453
     * @param string $routing_key
454
     * @param bool $nowait
455
     * @param array $arguments
456
     * @param int $ticket
457
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
458
     * @return mixed|null
459
     */
460
    public function exchange_bind(
461
        $destination,
462
        $source,
463
        $routing_key = '',
464
        $nowait = false,
465
        $arguments = array(),
466
        $ticket = null
467
    ) {
468
        $ticket = $this->getTicket($ticket);
469
470
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
471
            $ticket,
472
            $destination,
473
            $source,
474
            $routing_key,
475
            $nowait,
476
            $arguments
477
        );
478
479
        $this->send_method_frame(array($class_id, $method_id), $args);
480
481
        if ($nowait) {
482
            return null;
483
        }
484
485
        return $this->wait(array(
486
            $this->waitHelper->get_wait('exchange.bind_ok')
487
        ), false, $this->channel_rpc_timeout);
488
    }
489
490
    /**
491
     * Confirms bind successful
492
     * @param AMQPReader $reader
493
     */
494
    protected function exchange_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
495
    {
496
    }
497
498
    /**
499
     * Unbinds dest exchange from source exchange
500
     *
501
     * @param string $destination
502
     * @param string $source
503
     * @param string $routing_key
504
     * @param bool $nowait
505
     * @param array $arguments
506
     * @param int $ticket
507
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
508
     * @return mixed
509
     */
510 View Code Duplication
    public function exchange_unbind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
511
        $destination,
512
        $source,
513
        $routing_key = '',
514
        $nowait = false,
515
        $arguments = array(),
516
        $ticket = null
517
    ) {
518
        $ticket = $this->getTicket($ticket);
519
520
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
521
            $ticket,
522
            $destination,
523
            $source,
524
            $routing_key,
525
            $nowait,
526
            $arguments
527
        );
528
529
        $this->send_method_frame(array($class_id, $method_id), $args);
530
531
        return $this->wait(array(
532
            $this->waitHelper->get_wait('exchange.unbind_ok')
533
        ), false, $this->channel_rpc_timeout);
534
    }
535
536
    /**
537
     * Confirms unbind successful
538
     *
539
     * @param AMQPReader $reader
540
     */
541
    protected function exchange_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
542
    {
543
    }
544
545
    /**
546
     * Binds queue to an exchange
547
     *
548
     * @param string $queue
549
     * @param string $exchange
550
     * @param string $routing_key
551
     * @param bool $nowait
552
     * @param array $arguments
553
     * @param int $ticket
554
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
555
     * @return mixed|null
556
     */
557 126
    public function queue_bind(
558
        $queue,
559
        $exchange,
560
        $routing_key = '',
561
        $nowait = false,
562
        $arguments = array(),
563
        $ticket = null
564
    ) {
565 126
        $ticket = $this->getTicket($ticket);
566
567 126
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
568 126
            $ticket,
569 126
            $queue,
570 126
            $exchange,
571 126
            $routing_key,
572 126
            $nowait,
573 63
            $arguments
574 63
        );
575
576 126
        $this->send_method_frame(array($class_id, $method_id), $args);
577
578 126
        if ($nowait) {
579
            return null;
580
        }
581
582 126
        return $this->wait(array(
583 126
            $this->waitHelper->get_wait('queue.bind_ok')
584 126
        ), false, $this->channel_rpc_timeout);
585
    }
586
587
    /**
588
     * Confirms bind successful
589
     *
590
     * @param AMQPReader $reader
591
     */
592 126
    protected function queue_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
593
    {
594 126
    }
595
596
    /**
597
     * Unbind queue from an exchange
598
     *
599
     * @param string $queue
600
     * @param string $exchange
601
     * @param string $routing_key
602
     * @param array $arguments
603
     * @param int $ticket
604
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
605
     * @return mixed
606
     */
607
    public function queue_unbind(
608
        $queue,
609
        $exchange,
610
        $routing_key = '',
611
        $arguments = array(),
612
        $ticket = null
613
    ) {
614
        $ticket = $this->getTicket($ticket);
615
616
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
617
            $ticket,
618
            $queue,
619
            $exchange,
620
            $routing_key,
621
            $arguments
622
        );
623
624
        $this->send_method_frame(array($class_id, $method_id), $args);
625
626
        return $this->wait(array(
627
            $this->waitHelper->get_wait('queue.unbind_ok')
628
        ), false, $this->channel_rpc_timeout);
629
    }
630
631
    /**
632
     * Confirms unbind successful
633
     * @param AMQPReader $reader
634
     */
635
    protected function queue_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
636
    {
637
    }
638
639
    /**
640
     * Declares queue, creates if needed
641
     *
642
     * @param string $queue
643
     * @param bool $passive
644
     * @param bool $durable
645
     * @param bool $exclusive
646
     * @param bool $auto_delete
647
     * @param bool $nowait
648
     * @param array $arguments
649
     * @param int $ticket
650
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
651
     * @return mixed|null
652
     */
653 138 View Code Duplication
    public function queue_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
654
        $queue = '',
655
        $passive = false,
656
        $durable = false,
657
        $exclusive = false,
658
        $auto_delete = true,
659
        $nowait = false,
660
        $arguments = array(),
661
        $ticket = null
662
    ) {
663 138
        $ticket = $this->getTicket($ticket);
664
665 138
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
666 138
            $ticket,
667 138
            $queue,
668 138
            $passive,
669 138
            $durable,
670 138
            $exclusive,
671 138
            $auto_delete,
672 138
            $nowait,
673 69
            $arguments
674 69
        );
675
676 138
        $this->send_method_frame(array($class_id, $method_id), $args);
677
678 138
        if ($nowait) {
679
            return null;
680
        }
681
682 138
        return $this->wait(array(
683 138
            $this->waitHelper->get_wait('queue.declare_ok')
684 138
        ), false, $this->channel_rpc_timeout);
685
    }
686
687
    /**
688
     * Confirms a queue definition
689
     *
690
     * @param AMQPReader $reader
691
     * @return string[]
692
     */
693 132
    protected function queue_declare_ok($reader)
694
    {
695 132
        $queue = $reader->read_shortstr();
696 132
        $message_count = $reader->read_long();
697 132
        $consumer_count = $reader->read_long();
698
699 132
        return array($queue, $message_count, $consumer_count);
700
    }
701
702
    /**
703
     * Deletes a queue
704
     *
705
     * @param string $queue
706
     * @param bool $if_unused
707
     * @param bool $if_empty
708
     * @param bool $nowait
709
     * @param int $ticket
710
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
711
     * @return mixed|null
712
     */
713 36
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
714
    {
715 36
        $ticket = $this->getTicket($ticket);
716
717 36
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
718 36
            $ticket,
719 36
            $queue,
720 36
            $if_unused,
721 36
            $if_empty,
722 18
            $nowait
723 18
        );
724
725 36
        $this->send_method_frame(array($class_id, $method_id), $args);
726
727 36
        if ($nowait) {
728
            return null;
729
        }
730
731 36
        return $this->wait(array(
732 36
            $this->waitHelper->get_wait('queue.delete_ok')
733 36
        ), false, $this->channel_rpc_timeout);
734
    }
735
736
    /**
737
     * Confirms deletion of a queue
738
     *
739
     * @param AMQPReader $reader
740
     * @return string
741
     */
742 36
    protected function queue_delete_ok($reader)
743
    {
744 36
        return $reader->read_long();
745
    }
746
747
    /**
748
     * Purges a queue
749
     *
750
     * @param string $queue
751
     * @param bool $nowait
752
     * @param int $ticket
753
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
754
     * @return mixed|null
755
     */
756 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
757
    {
758
        $ticket = $this->getTicket($ticket);
759
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
760
761
        $this->send_method_frame(array($class_id, $method_id), $args);
762
763
        if ($nowait) {
764
            return null;
765
        }
766
767
        return $this->wait(array(
768
            $this->waitHelper->get_wait('queue.purge_ok')
769
        ), false, $this->channel_rpc_timeout);
770
    }
771
772
    /**
773
     * Confirms a queue purge
774
     *
775
     * @param AMQPReader $reader
776
     * @return string
777
     */
778
    protected function queue_purge_ok($reader)
779
    {
780
        return $reader->read_long();
781
    }
782
783
    /**
784
     * Acknowledges one or more messages
785
     *
786
     * @param string $delivery_tag
787
     * @param bool $multiple
788
     */
789 6
    public function basic_ack($delivery_tag, $multiple = false)
790
    {
791 6
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
792 6
        $this->send_method_frame(array($class_id, $method_id), $args);
793 6
    }
794
795
    /**
796
     * Called when the server sends a basic.ack
797
     *
798
     * @param AMQPReader $reader
799
     * @throws AMQPRuntimeException
800
     */
801 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
802
    {
803
        $delivery_tag = $reader->read_longlong();
804
        $multiple = (bool) $reader->read_bit();
805
806
        if (!isset($this->published_messages[$delivery_tag])) {
807
            throw new AMQPRuntimeException(sprintf(
808
                'Server ack\'ed unknown delivery_tag "%s"',
809
                $delivery_tag
810
            ));
811
        }
812
813
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
814
    }
815
816
    /**
817
     * Called when the server sends a basic.nack
818
     *
819
     * @param AMQPReader $reader
820
     * @throws AMQPRuntimeException
821
     */
822 View Code Duplication
    protected function basic_nack_from_server($reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
823
    {
824
        $delivery_tag = $reader->read_longlong();
825
        $multiple = (bool) $reader->read_bit();
826
827
        if (!isset($this->published_messages[$delivery_tag])) {
828
            throw new AMQPRuntimeException(sprintf(
829
                'Server nack\'ed unknown delivery_tag "%s"',
830
                $delivery_tag
831
            ));
832
        }
833
834
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
835
    }
836
837
    /**
838
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
839
     *
840
     * @param string $delivery_tag
841
     * @param bool $multiple
842
     * @param callable $handler
843
     */
844
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
845
    {
846
        if ($multiple) {
847
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
848
849
            foreach ($keys as $key) {
850
                $this->internal_ack_handler($key, false, $handler);
851
            }
852
853
        } else {
854
            $message = $this->get_and_unset_message($delivery_tag);
855
            $message->delivery_info['delivery_tag'] = $delivery_tag;
856
            $this->dispatch_to_handler($handler, array($message));
857
        }
858
    }
859
860
    /**
861
     * @param AMQPMessage[] $messages
862
     * @param string $value
863
     * @return mixed
864
     */
865
    protected function get_keys_less_or_equal(array $messages, $value)
866
    {
867
        $keys = array_reduce(
868
            array_keys($messages),
869
870
            /**
871
             * @param string $key
872
             */
873
            function ($keys, $key) use ($value) {
874
                if (bccomp($key, $value, 0) <= 0) {
875
                    $keys[] = $key;
876
                }
877
878
                return $keys;
879
            },
880
            array()
881
        );
882
883
        return $keys;
884
    }
885
886
    /**
887
     * Rejects one or several received messages
888
     *
889
     * @param string $delivery_tag
890
     * @param bool $multiple
891
     * @param bool $requeue
892
     */
893
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
894
    {
895
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
896
        $this->send_method_frame(array($class_id, $method_id), $args);
897
    }
898
899
    /**
900
     * Ends a queue consumer
901
     *
902
     * @param string $consumer_tag
903
     * @param bool $nowait
904
     * @param bool $noreturn
905
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
906
     * @return mixed
907
     */
908 18 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
909
    {
910 18
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
911 18
        $this->send_method_frame(array($class_id, $method_id), $args);
912
913 18
        if ($nowait || $noreturn) {
914
            unset($this->callbacks[$consumer_tag]);
915
            return $consumer_tag;
916
        }
917
918 18
        return $this->wait(array(
919 18
            $this->waitHelper->get_wait('basic.cancel_ok')
920 18
        ), false, $this->channel_rpc_timeout);
921
    }
922
923
    /**
924
     * @param AMQPReader $reader
925
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
926
     */
927
    protected function basic_cancel_from_server(AMQPReader $reader)
928
    {
929
        throw new AMQPBasicCancelException($reader->read_shortstr());
930
    }
931
932
    /**
933
     * Confirm a cancelled consumer
934
     *
935
     * @param AMQPReader $reader
936
     * @return string
937
     */
938 18
    protected function basic_cancel_ok($reader)
939
    {
940 18
        $consumerTag = $reader->read_shortstr();
941 18
        unset($this->callbacks[$consumerTag]);
942
943 18
        return $consumerTag;
944
    }
945
946
    /**
947
     * @return bool
948
     */
949
    public function is_consuming()
950
    {
951
        return !empty($this->callbacks);
952
    }
953
954
    /**
955
     * Starts a queue consumer
956
     *
957
     * @param string $queue
958
     * @param string $consumer_tag
959
     * @param bool $no_local
960
     * @param bool $no_ack
961
     * @param bool $exclusive
962
     * @param bool $nowait
963
     * @param callable|null $callback
964
     * @param int|null $ticket
965
     * @param array $arguments
966
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
967
     * @return mixed|string
968
     */
969 18
    public function basic_consume(
970
        $queue = '',
971
        $consumer_tag = '',
972
        $no_local = false,
973
        $no_ack = false,
974
        $exclusive = false,
975
        $nowait = false,
976
        $callback = null,
977
        $ticket = null,
978
        $arguments = array()
979
    ) {
980 18
        $ticket = $this->getTicket($ticket);
981 18
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
982 18
            $ticket,
983 18
            $queue,
984 18
            $consumer_tag,
985 18
            $no_local,
986 18
            $no_ack,
987 18
            $exclusive,
988 18
            $nowait,
989 18
            $this->protocolVersion == '0.9.1' ? $arguments : null
990 9
        );
991
992 18
        $this->send_method_frame(array($class_id, $method_id), $args);
993
994 18
        if (false === $nowait) {
995 18
            $consumer_tag = $this->wait(array(
996 18
                $this->waitHelper->get_wait('basic.consume_ok')
997 18
            ), false, $this->channel_rpc_timeout);
998 9
        }
999
1000 18
        $this->callbacks[$consumer_tag] = $callback;
1001
1002 18
        return $consumer_tag;
1003
    }
1004
1005
    /**
1006
     * Confirms a new consumer
1007
     *
1008
     * @param AMQPReader $reader
1009
     * @return string
1010
     */
1011 18
    protected function basic_consume_ok($reader)
1012
    {
1013 18
        return $reader->read_shortstr();
1014
    }
1015
1016
    /**
1017
     * Notifies the client of a consumer message
1018
     *
1019
     * @param AMQPReader $reader
1020
     * @param AMQPMessage $message
1021
     */
1022 18
    protected function basic_deliver($reader, $message)
1023
    {
1024 18
        $consumer_tag = $reader->read_shortstr();
1025 18
        $delivery_tag = $reader->read_longlong();
1026 18
        $redelivered = $reader->read_bit();
1027 18
        $exchange = $reader->read_shortstr();
1028 18
        $routing_key = $reader->read_shortstr();
1029
1030 18
        $message->delivery_info = array(
1031 18
            'channel' => $this,
1032 18
            'consumer_tag' => $consumer_tag,
1033 18
            'delivery_tag' => $delivery_tag,
1034 18
            'redelivered' => $redelivered,
1035 18
            'exchange' => $exchange,
1036 9
            'routing_key' => $routing_key
1037 9
        );
1038
1039 18
        if (isset($this->callbacks[$consumer_tag])) {
1040 18
            call_user_func($this->callbacks[$consumer_tag], $message);
1041 9
        }
1042 18
    }
1043
1044
    /**
1045
     * Direct access to a queue if no message was available in the queue, return null
1046
     *
1047
     * @param string $queue
1048
     * @param bool $no_ack
1049
     * @param int $ticket
1050
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1051
     * @return mixed
1052
     */
1053 36
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
1054
    {
1055 36
        $ticket = $this->getTicket($ticket);
1056 36
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1057
1058 36
        $this->send_method_frame(array($class_id, $method_id), $args);
1059
1060 36
        return $this->wait(array(
1061 36
            $this->waitHelper->get_wait('basic.get_ok'),
1062 36
            $this->waitHelper->get_wait('basic.get_empty')
1063 36
        ), false, $this->channel_rpc_timeout);
1064
    }
1065
1066
    /**
1067
     * Indicates no messages available
1068
     *
1069
     * @param AMQPReader $reader
1070
     */
1071
    protected function basic_get_empty($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1072
    {
1073
    }
1074
1075
    /**
1076
     * Provides client with a message
1077
     *
1078
     * @param AMQPReader $reader
1079
     * @param AMQPMessage $message
1080
     * @return AMQPMessage
1081
     */
1082 36
    protected function basic_get_ok($reader, $message)
1083
    {
1084 36
        $delivery_tag = $reader->read_longlong();
1085 36
        $redelivered = $reader->read_bit();
1086 36
        $exchange = $reader->read_shortstr();
1087 36
        $routing_key = $reader->read_shortstr();
1088 36
        $message_count = $reader->read_long();
1089
1090 36
        $message->delivery_info = array(
1091 36
            'delivery_tag' => $delivery_tag,
1092 36
            'redelivered' => $redelivered,
1093 36
            'exchange' => $exchange,
1094 36
            'routing_key' => $routing_key,
1095 18
            'message_count' => $message_count
1096 18
        );
1097
1098 36
        return $message;
1099
    }
1100
1101
    /**
1102
     * @param string $exchange
1103
     * @param string $routing_key
1104
     * @param $mandatory
1105
     * @param $immediate
1106
     * @param int $ticket
1107
     * @return mixed
1108
     */
1109 54
    private function pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1110
    {
1111 54
        $cache_key = sprintf(
1112 54
            '%s|%s|%s|%s|%s',
1113 54
            $exchange,
1114 54
            $routing_key,
1115 54
            $mandatory,
1116 54
            $immediate,
1117 27
            $ticket
1118 27
        );
1119 54
        if (false === isset($this->publish_cache[$cache_key])) {
1120 54
            $ticket = $this->getTicket($ticket);
1121 54
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1122 54
                $ticket,
1123 54
                $exchange,
1124 54
                $routing_key,
1125 54
                $mandatory,
1126 27
                $immediate
1127 27
            );
1128
1129 54
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1130 54
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1131 54
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1132
                reset($this->publish_cache);
1133
                $old_key = key($this->publish_cache);
1134
                unset($this->publish_cache[$old_key]);
1135
            }
1136 27
        }
1137
1138 54
        return $this->publish_cache[$cache_key];
1139
    }
1140
1141
    /**
1142
     * Publishes a message
1143
     *
1144
     * @param AMQPMessage $msg
1145
     * @param string $exchange
1146
     * @param string $routing_key
1147
     * @param bool $mandatory
1148
     * @param bool $immediate
1149
     * @param int $ticket
1150
     */
1151 54
    public function basic_publish(
1152
        $msg,
1153
        $exchange = '',
1154
        $routing_key = '',
1155
        $mandatory = false,
1156
        $immediate = false,
1157
        $ticket = null
1158
    ) {
1159 54
        $pkt = new AMQPWriter();
1160 54
        $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1161
1162
        try {
1163 54
            $this->connection->send_content(
1164 54
                $this->channel_id,
1165 54
                60,
1166 54
                0,
1167 54
                mb_strlen($msg->body, 'ASCII'),
1168 54
                $msg->serialize_properties(),
1169 54
                $msg->body,
1170 27
                $pkt
1171 27
            );
1172 27
        } catch (AMQPConnectionClosedException $e) {
1173
            $this->do_close();
1174
            throw $e;
1175
        }
1176
1177 54 View Code Duplication
        if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1178
            $this->published_messages[$this->next_delivery_tag] = $msg;
1179
            $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1180
        }
1181 54
    }
1182
1183
    /**
1184
     * @param AMQPMessage $msg
1185
     * @param string $exchange
1186
     * @param string $routing_key
1187
     * @param bool $mandatory
1188
     * @param bool $immediate
1189
     * @param int $ticket
1190
     */
1191
    public function batch_basic_publish(
1192
        $msg,
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1193
        $exchange = '',
0 ignored issues
show
Unused Code introduced by
The parameter $exchange is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1194
        $routing_key = '',
0 ignored issues
show
Unused Code introduced by
The parameter $routing_key is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1195
        $mandatory = false,
0 ignored issues
show
Unused Code introduced by
The parameter $mandatory is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1196
        $immediate = false,
0 ignored issues
show
Unused Code introduced by
The parameter $immediate is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1197
        $ticket = null
0 ignored issues
show
Unused Code introduced by
The parameter $ticket is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1198
    ) {
1199
        $this->batch_messages[] = func_get_args();
1200
    }
1201
1202
    /**
1203
     * Publish batch
1204
     *
1205
     * @return void
1206
     */
1207
    public function publish_batch()
1208
    {
1209
        if (empty($this->batch_messages)) {
1210
            return null;
1211
        }
1212
1213
        /** @var AMQPWriter $pkt */
1214
        $pkt = new AMQPWriter();
1215
1216
        /** @var AMQPMessage $msg */
1217
        foreach ($this->batch_messages as $m) {
1218
            $msg = $m[0];
1219
1220
            $exchange = isset($m[1]) ? $m[1] : '';
1221
            $routing_key = isset($m[2]) ? $m[2] : '';
1222
            $mandatory = isset($m[3]) ? $m[3] : false;
1223
            $immediate = isset($m[4]) ? $m[4] : false;
1224
            $ticket = isset($m[5]) ? $m[5] : null;
1225
            $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1226
1227
            $this->connection->prepare_content(
1228
                $this->channel_id,
1229
                60,
1230
                0,
1231
                mb_strlen($msg->body, 'ASCII'),
1232
                $msg->serialize_properties(),
1233
                $msg->body,
1234
                $pkt
1235
            );
1236
1237 View Code Duplication
            if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1238
                $this->published_messages[$this->next_delivery_tag] = $msg;
1239
                $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1240
            }
1241
        }
1242
1243
        //call write here
1244
        $this->connection->write($pkt->getvalue());
1245
        $this->batch_messages = array();
1246
    }
1247
1248
    /**
1249
     * Specifies QoS
1250
     *
1251
     * @param int $prefetch_size
1252
     * @param int $prefetch_count
1253
     * @param bool $a_global
1254
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1255
     * @return mixed
1256
     */
1257 View Code Duplication
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1258
    {
1259
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1260
            $prefetch_size,
1261
            $prefetch_count,
1262
            $a_global
1263
        );
1264
1265
        $this->send_method_frame(array($class_id, $method_id), $args);
1266
1267
        return $this->wait(array(
1268
            $this->waitHelper->get_wait('basic.qos_ok')
1269
        ), false, $this->channel_rpc_timeout);
1270
    }
1271
1272
    /**
1273
     * Confirms QoS request
1274
     * @param AMQPReader $reader
1275
     */
1276
    protected function basic_qos_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1277
    {
1278
    }
1279
1280
    /**
1281
     * Redelivers unacknowledged messages
1282
     *
1283
     * @param bool $requeue
1284
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1285
     * @return mixed
1286
     */
1287 View Code Duplication
    public function basic_recover($requeue = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1288
    {
1289
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1290
        $this->send_method_frame(array($class_id, $method_id), $args);
1291
1292
        return $this->wait(array(
1293
            $this->waitHelper->get_wait('basic.recover_ok')
1294
        ), false, $this->channel_rpc_timeout);
1295
    }
1296
1297
    /**
1298
     * Confirm the requested recover
1299
     * @param AMQPReader $reader
1300
     */
1301
    protected function basic_recover_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1302
    {
1303
    }
1304
1305
    /**
1306
     * Rejects an incoming message
1307
     *
1308
     * @param string $delivery_tag
1309
     * @param bool $requeue
1310
     */
1311
    public function basic_reject($delivery_tag, $requeue)
1312
    {
1313
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1314
        $this->send_method_frame(array($class_id, $method_id), $args);
1315
    }
1316
1317
    /**
1318
     * Returns a failed message
1319
     *
1320
     * @param AMQPReader $reader
1321
     * @param AMQPMessage $message
1322
     * @return null
1323
     */
1324
    protected function basic_return($reader, $message)
1325
    {
1326
        $callback = $this->basic_return_callback;
1327
        if (!is_callable($callback)) {
1328
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1329
            return null;
1330
        }
1331
1332
        $reply_code = $reader->read_short();
1333
        $reply_text = $reader->read_shortstr();
1334
        $exchange = $reader->read_shortstr();
1335
        $routing_key = $reader->read_shortstr();
1336
1337
        call_user_func_array($callback, array(
1338
            $reply_code,
1339
            $reply_text,
1340
            $exchange,
1341
            $routing_key,
1342
            $message,
1343
        ));
1344
    }
1345
1346
    /**
1347
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1348
     * @return mixed
1349
     */
1350 View Code Duplication
    public function tx_commit()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1351
    {
1352
        $this->send_method_frame(array(90, 20));
1353
1354
        return $this->wait(array(
1355
            $this->waitHelper->get_wait('tx.commit_ok')
1356
        ), false, $this->channel_rpc_timeout);
1357
    }
1358
1359
    /**
1360
     * Confirms a successful commit
1361
     * @param AMQPReader $reader
1362
     */
1363
    protected function tx_commit_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1364
    {
1365
    }
1366
1367
    /**
1368
     * Rollbacks the current transaction
1369
     *
1370
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1371
     * @return mixed
1372
     */
1373 View Code Duplication
    public function tx_rollback()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1374
    {
1375
        $this->send_method_frame(array(90, 30));
1376
1377
        return $this->wait(array(
1378
            $this->waitHelper->get_wait('tx.rollback_ok')
1379
        ), false, $this->channel_rpc_timeout);
1380
    }
1381
1382
    /**
1383
     * Confirms a successful rollback
1384
     *
1385
     * @param AMQPReader $reader
1386
     */
1387
    protected function tx_rollback_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1388
    {
1389
    }
1390
1391
    /**
1392
     * Puts the channel into confirm mode
1393
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1394
     *
1395
     * @param bool $nowait
1396
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1397
     * @return null
1398
     */
1399 6
    public function confirm_select($nowait = false)
1400
    {
1401 6
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
1402
1403 6
        $this->send_method_frame(array($class_id, $method_id), $args);
1404
1405 6
        if ($nowait) {
1406
            return null;
1407
        }
1408
1409 6
        $this->wait(array(
1410 6
            $this->waitHelper->get_wait('confirm.select_ok')
1411 6
        ), false, $this->channel_rpc_timeout);
1412
        $this->next_delivery_tag = 1;
1413
    }
1414
1415
    /**
1416
     * Confirms a selection
1417
     *
1418
     * @param AMQPReader $reader
1419
     */
1420
    public function confirm_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1421
    {
1422
    }
1423
1424
    /**
1425
     * Waits for pending acks and nacks from the server.
1426
     * If there are no pending acks, the method returns immediately
1427
     *
1428
     * @param int $timeout Waits until $timeout value is reached
1429
     */
1430 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1431
    {
1432
        $functions = array(
1433
            $this->waitHelper->get_wait('basic.ack'),
1434
            $this->waitHelper->get_wait('basic.nack'),
1435
        );
1436
1437
        while (count($this->published_messages) !== 0) {
1438
            if ($timeout > 0) {
1439
                $this->wait($functions, true, $timeout);
1440
            } else {
1441
                $this->wait($functions);
1442
            }
1443
        }
1444
    }
1445
1446
    /**
1447
     * Waits for pending acks, nacks and returns from the server. If there are no pending acks, the method returns immediately.
1448
     *
1449
     * @param int $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1450
     */
1451 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1452
    {
1453
        $functions = array(
1454
            $this->waitHelper->get_wait('basic.ack'),
1455
            $this->waitHelper->get_wait('basic.nack'),
1456
            $this->waitHelper->get_wait('basic.return'),
1457
        );
1458
1459
        while (count($this->published_messages) !== 0) {
1460
            if ($timeout > 0) {
1461
                $this->wait($functions, true, $timeout);
1462
            } else {
1463
                $this->wait($functions);
1464
            }
1465
        }
1466
    }
1467
1468
    /**
1469
     * Selects standard transaction mode
1470
     *
1471
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1472
     * @return mixed
1473
     */
1474 View Code Duplication
    public function tx_select()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1475
    {
1476
        $this->send_method_frame(array(90, 10));
1477
1478
        return $this->wait(array(
1479
            $this->waitHelper->get_wait('tx.select_ok')
1480
        ), false, $this->channel_rpc_timeout);
1481
    }
1482
1483
    /**
1484
     * Confirms transaction mode
1485
     * @param AMQPReader $reader
1486
     */
1487
    protected function tx_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1488
    {
1489
    }
1490
1491
    /**
1492
     * @param array $arguments
1493
     * @return array
1494
     */
1495
    protected function getArguments($arguments)
1496
    {
1497
        @trigger_error(sprintf(
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
1498
            'Method "%s" is deprecated, please use an array as a default argument instead',
1499
            __METHOD__
1500
        ), E_USER_DEPRECATED);
1501
        return (null === $arguments) ? array() : $arguments;
1502
    }
1503
1504
    /**
1505
     * @param int $ticket
1506
     * @return int
1507
     */
1508 168
    protected function getTicket($ticket)
1509
    {
1510 168
        return (null === $ticket) ? $this->default_ticket : $ticket;
1511
    }
1512
1513
    /**
1514
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1515
     *
1516
     * @param int $index
1517
     * @return AMQPMessage
1518
     */
1519
    protected function get_and_unset_message($index)
1520
    {
1521
        $message = $this->published_messages[$index];
1522
        unset($this->published_messages[$index]);
1523
1524
        return $message;
1525
    }
1526
1527
    /**
1528
     * Sets callback for basic_return
1529
     *
1530
     * @param  callable $callback
1531
     * @throws \InvalidArgumentException if $callback is not callable
1532
     */
1533 View Code Duplication
    public function set_return_listener($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1534
    {
1535
        if (!is_callable($callback)) {
1536
            throw new \InvalidArgumentException(sprintf(
1537
                'Given callback "%s" should be callable. %s type was given.',
1538
                $callback,
1539
                gettype($callback)
1540
            ));
1541
        }
1542
1543
        $this->basic_return_callback = $callback;
1544
    }
1545
1546
    /**
1547
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1548
     *
1549
     * @param callable $callback
1550
     * @throws \InvalidArgumentException
1551
     */
1552 View Code Duplication
    public function set_nack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1553
    {
1554
        if (!is_callable($callback)) {
1555
            throw new \InvalidArgumentException(sprintf(
1556
                'Given callback "%s" should be callable. %s type was given.',
1557
                $callback,
1558
                gettype($callback)
1559
            ));
1560
        }
1561
1562
        $this->nack_handler = $callback;
1563
    }
1564
1565
    /**
1566
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1567
     *
1568
     * @param callable $callback
1569
     * @throws \InvalidArgumentException
1570
     */
1571 View Code Duplication
    public function set_ack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1572
    {
1573
        if (!is_callable($callback)) {
1574
            throw new \InvalidArgumentException(sprintf(
1575
                'Given callback "%s" should be callable. %s type was given.',
1576
                $callback,
1577
                gettype($callback)
1578
            ));
1579
        }
1580
1581
        $this->ack_handler = $callback;
1582
    }
1583
}
1584