Completed
Push — master ( 9c1420...6b08d0 )
by
unknown
19:08
created

AMQPChannel::exchange_unbind()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 25
Code Lines 18

Duplication

Lines 25
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 25
loc 25
ccs 0
cts 15
cp 0
rs 8.8571
c 0
b 0
f 0
cc 1
eloc 18
nc 1
nop 6
crap 2
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Exception\AMQPBasicCancelException;
5
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPReader;
9
use PhpAmqpLib\Wire\AMQPWriter;
10
11
class AMQPChannel extends AbstractChannel
12
{
13
    /** @var array */
14
    public $callbacks = array();
15
16
    /** @var bool Whether or not the channel has been "opened" */
17
    protected $is_open = false;
18
19
    /** @var int */
20
    protected $default_ticket;
21
22
    /** @var bool */
23
    protected $active;
24
25
    /** @var array */
26
    protected $alerts;
27
28
    /** @var bool */
29
    protected $auto_decode;
30
31
    /**
32
     * These parameters will be passed to function in case of basic_return:
33
     *    param int $reply_code
34
     *    param string $reply_text
35
     *    param string $exchange
36
     *    param string $routing_key
37
     *    param AMQPMessage $msg
38
     *
39
     * @var callable
40
     */
41
    protected $basic_return_callback;
42
43
    /** @var array Used to keep track of the messages that are going to be batch published. */
44
    protected $batch_messages = array();
45
46
    /**
47
     * If the channel is in confirm_publish mode this array will store all published messages
48
     * until they get ack'ed or nack'ed
49
     *
50
     * @var AMQPMessage[]
51
     */
52
    private $published_messages = array();
53
54
    /** @var int */
55
    private $next_delivery_tag = 0;
56
57
    /** @var callable */
58
    private $ack_handler;
59
60
    /** @var callable */
61
    private $nack_handler;
62
63
    /**
64
     * Circular buffer to speed up both basic_publish() and publish_batch().
65
     * Max size limited by $publish_cache_max_size.
66
     *
67
     * @var array
68
     * @see basic_publish()
69
     * @see publish_batch()
70
     */
71
    private $publish_cache;
72
73
    /**
74
     * Maximal size of $publish_cache
75
     *
76
     * @var int
77
     */
78
    private $publish_cache_max_size;
79
80
    /**
81
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
82
     * @param null $channel_id
83
     * @param bool $auto_decode
84
     * @throws \Exception
85
     */
86 84
    public function __construct($connection, $channel_id = null, $auto_decode = true)
87
    {
88 84
        if ($channel_id == null) {
89
            $channel_id = $connection->get_free_channel_id();
90
        }
91
92 84
        parent::__construct($connection, $channel_id);
93
94 84
        $this->publish_cache = array();
95 84
        $this->publish_cache_max_size = 100;
96
97 84
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
98
99 84
        $this->default_ticket = 0;
100 84
        $this->is_open = false;
101 84
        $this->active = true; // Flow control
102 84
        $this->alerts = array();
103 84
        $this->callbacks = array();
104 84
        $this->auto_decode = $auto_decode;
105
106
        try {
107 84
            $this->x_open();
108 56
        } catch (\Exception $e) {
109
            $this->close();
110
            throw $e;
111
        }
112 84
    }
113
114
    /**
115
     * Tear down this object, after we've agreed to close with the server.
116
     */
117 90
    protected function do_close()
118
    {
119 90
        if ($this->channel_id !== null) {
120 84
            unset($this->connection->channels[$this->channel_id]);
121 56
        }
122 90
        $this->channel_id = $this->connection = null;
123 90
        $this->is_open = false;
124 90
    }
125
126
    /**
127
     * Only for AMQP0.8.0
128
     * This method allows the server to send a non-fatal warning to
129
     * the client.  This is used for methods that are normally
130
     * asynchronous and thus do not have confirmations, and for which
131
     * the server may detect errors that need to be reported.  Fatal
132
     * errors are handled as channel or connection exceptions; non-
133
     * fatal errors are sent through this method.
134
     *
135
     * @param AMQPReader $reader
136
     */
137
    protected function channel_alert($reader)
138
    {
139
        $reply_code = $reader->read_short();
140
        $reply_text = $reader->read_shortstr();
141
        $details = $reader->read_table();
142
        array_push($this->alerts, array($reply_code, $reply_text, $details));
143
    }
144
145
    /**
146
     * Request a channel close
147
     *
148
     * @param int $reply_code
149
     * @param string $reply_text
150
     * @param array $method_sig
151
     * @return mixed
152
     */
153 90
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
154
    {
155 90
        if ($this->is_open === false || $this->connection === null) {
156 11
            $this->do_close();
157
158 11
            return null; // already closed
159
        }
160 84
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
161 84
            $reply_code,
162 84
            $reply_text,
163 84
            $method_sig[0],
164 84
            $method_sig[1]
165 56
        );
166
167 84
        $this->send_method_frame(array($class_id, $method_id), $args);
168
169 84
        return $this->wait(array(
170 84
            $this->waitHelper->get_wait('channel.close_ok')
171 56
        ));
172
    }
173
174
    /**
175
     * @param AMQPReader $reader
176
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
177
     */
178 12
    protected function channel_close($reader)
179
    {
180 12
        $reply_code = $reader->read_short();
181 12
        $reply_text = $reader->read_shortstr();
182 12
        $class_id = $reader->read_short();
183 12
        $method_id = $reader->read_short();
184
185 12
        $this->send_method_frame(array(20, 41));
186 12
        $this->do_close();
187
188 12
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
189
    }
190
191
    /**
192
     * Confirm a channel close
193
     * Alias of AMQPChannel::do_close()
194
     *
195
     * @param AMQPReader $reader
196
     */
197 84
    protected function channel_close_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
198
    {
199 84
        $this->do_close();
200 84
    }
201
202
    /**
203
     * Enables/disables flow from peer
204
     *
205
     * @param $active
206
     * @return mixed
207
     */
208
    public function flow($active)
209
    {
210
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
211
        $this->send_method_frame(array($class_id, $method_id), $args);
212
213
        return $this->wait(array(
214
            $this->waitHelper->get_wait('channel.flow_ok')
215
        ));
216
    }
217
218
    /**
219
     * @param AMQPReader $reader
220
     */
221
    protected function channel_flow($reader)
222
    {
223
        $this->active = $reader->read_bit();
224
        $this->x_flow_ok($this->active);
225
    }
226
227
    /**
228
     * @param bool $active
229
     */
230
    protected function x_flow_ok($active)
231
    {
232
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
233
        $this->send_method_frame(array($class_id, $method_id), $args);
234
    }
235
236
    /**
237
     * @param AMQPReader $reader
238
     * @return bool
239
     */
240
    protected function channel_flow_ok($reader)
241
    {
242
        return $reader->read_bit();
243
    }
244
245
    /**
246
     * @param string $out_of_band
247
     * @return mixed
248
     */
249 84 View Code Duplication
    protected function x_open($out_of_band = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
250
    {
251 84
        if ($this->is_open) {
252
            return null;
253
        }
254
255 84
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
256 84
        $this->send_method_frame(array($class_id, $method_id), $args);
257
258 84
        return $this->wait(array(
259 84
            $this->waitHelper->get_wait('channel.open_ok')
260 56
        ));
261
    }
262
263
    /**
264
     * @param AMQPReader $reader
265
     */
266 84
    protected function channel_open_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
267
    {
268 84
        $this->is_open = true;
269
270 84
        $this->debug->debug_msg('Channel open');
271 84
    }
272
273
    /**
274
     * Requests an access ticket
275
     *
276
     * @param string $realm
277
     * @param bool $exclusive
278
     * @param bool $passive
279
     * @param bool $active
280
     * @param bool $write
281
     * @param bool $read
282
     * @return mixed
283
     */
284
    public function access_request(
285
        $realm,
286
        $exclusive = false,
287
        $passive = false,
288
        $active = false,
289
        $write = false,
290
        $read = false
291
    ) {
292
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
293
            $realm,
294
            $exclusive,
295
            $passive,
296
            $active,
297
            $write,
298
            $read
299
        );
300
301
        $this->send_method_frame(array($class_id, $method_id), $args);
302
303
        return $this->wait(array(
304
            $this->waitHelper->get_wait('access.request_ok')
305
        ));
306
    }
307
308
    /**
309
     * Grants access to server resources
310
     *
311
     * @param AMQPReader $reader
312
     * @return string
313
     */
314
    protected function access_request_ok($reader)
315
    {
316
        $this->default_ticket = $reader->read_short();
317
318
        return $this->default_ticket;
319
    }
320
321
    /**
322
     * Declares exchange
323
     *
324
     * @param string $exchange
325
     * @param string $type
326
     * @param bool $passive
327
     * @param bool $durable
328
     * @param bool $auto_delete
329
     * @param bool $internal
330
     * @param bool $nowait
331
     * @param array $arguments
332
     * @param int $ticket
333
     * @return mixed|null
334
     */
335 78 View Code Duplication
    public function exchange_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
336
        $exchange,
337
        $type,
338
        $passive = false,
339
        $durable = false,
340
        $auto_delete = true,
341
        $internal = false,
342
        $nowait = false,
343
        $arguments = array(),
344
        $ticket = null
345
    ) {
346 78
        $ticket = $this->getTicket($ticket);
347 78
348
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
349 78
            $ticket,
350 78
            $exchange,
351 78
            $type,
352 78
            $passive,
353 78
            $durable,
354 78
            $auto_delete,
355 78
            $internal,
356 78
            $nowait,
357 78
            $arguments
358 26
        );
359 52
360
        $this->send_method_frame(array($class_id, $method_id), $args);
361 78
362
        if ($nowait) {
363 78
            return null;
364
        }
365
366
        return $this->wait(array(
367 78
            $this->waitHelper->get_wait('exchange.declare_ok')
368 78
        ));
369 52
    }
370
371
    /**
372
     * Confirms an exchange declaration
373
     * @param AMQPReader $reader
374
     */
375
    protected function exchange_declare_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
376 77
    {
377
    }
378 77
379
    /**
380
     * Deletes an exchange
381
     *
382
     * @param string $exchange
383
     * @param bool $if_unused
384
     * @param bool $nowait
385
     * @param null $ticket
386
     * @return mixed|null
387
     */
388 View Code Duplication
    public function exchange_delete(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
389 78
        $exchange,
390
        $if_unused = false,
391
        $nowait = false,
392
        $ticket = null
393
    ) {
394
        $ticket = $this->getTicket($ticket);
395 78
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
396 78
            $ticket,
397 78
            $exchange,
398 78
            $if_unused,
399 78
            $nowait
400 26
        );
401 52
402
        $this->send_method_frame(array($class_id, $method_id), $args);
403 78
404
        if ($nowait) {
405 78
            return null;
406
        }
407
408
        return $this->wait(array(
409 78
            $this->waitHelper->get_wait('exchange.delete_ok')
410 78
        ));
411 52
    }
412
413
    /**
414
     * Confirms deletion of an exchange
415
     *
416
     * @param AMQPReader $reader
417
     */
418
    protected function exchange_delete_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
419 78
    {
420
    }
421 78
422
    /**
423
     * Binds dest exchange to source exchange
424
     *
425
     * @param string $destination
426
     * @param string $source
427
     * @param string $routing_key
428
     * @param bool $nowait
429
     * @param array $arguments
430
     * @param int $ticket
431
     * @return mixed|null
432
     */
433 View Code Duplication
    public function exchange_bind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
434
        $destination,
435
        $source,
436
        $routing_key = '',
437
        $nowait = false,
438
        $arguments = array(),
439
        $ticket = null
440
    ) {
441
        $ticket = $this->getTicket($ticket);
442
443
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
444
            $ticket,
445
            $destination,
446
            $source,
447
            $routing_key,
448
            $nowait,
449
            $arguments
450
        );
451
452
        $this->send_method_frame(array($class_id, $method_id), $args);
453
454
        if ($nowait) {
455
            return null;
456
        }
457
458
        return $this->wait(array(
459
            $this->waitHelper->get_wait('exchange.bind_ok')
460
        ));
461
    }
462
463
    /**
464
     * Confirms bind successful
465
     * @param AMQPReader $reader
466
     */
467
    protected function exchange_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
468
    {
469
    }
470
471
    /**
472
     * Unbinds dest exchange from source exchange
473
     *
474
     * @param string $destination
475
     * @param string $source
476
     * @param string $routing_key
477
     * @param bool $nowait
478
     * @param array $arguments
479
     * @param int $ticket
480
     * @return mixed
481
     */
482 View Code Duplication
    public function exchange_unbind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
483
        $destination,
484
        $source,
485
        $routing_key = '',
486
        $nowait = false,
487
        $arguments = array(),
488
        $ticket = null
489
    ) {
490
        $ticket = $this->getTicket($ticket);
491
492
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
493
            $ticket,
494
            $destination,
495
            $source,
496
            $routing_key,
497
            $nowait,
498
            $arguments
499
        );
500
501
        $this->send_method_frame(array($class_id, $method_id), $args);
502
503
        return $this->wait(array(
504
            $this->waitHelper->get_wait('exchange.unbind_ok')
505
        ));
506
    }
507
508
    /**
509
     * Confirms unbind successful
510
     *
511
     * @param AMQPReader $reader
512
     */
513
    protected function exchange_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
514
    {
515
    }
516
517
    /**
518
     * Binds queue to an exchange
519
     *
520
     * @param string $queue
521
     * @param string $exchange
522
     * @param string $routing_key
523
     * @param bool $nowait
524
     * @param array $arguments
525 77
     * @param int $ticket
526
     * @return mixed|null
527 77
     */
528 77 View Code Duplication
    public function queue_bind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
529
        $queue,
530 77
        $exchange,
531 77
        $routing_key = '',
532 77
        $nowait = false,
533 77
        $arguments = array(),
534 77
        $ticket = null
535 77
    ) {
536 25
        $ticket = $this->getTicket($ticket);
537 52
538
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
539 77
            $ticket,
540
            $queue,
541 77
            $exchange,
542
            $routing_key,
543
            $nowait,
544
            $arguments
545 77
        );
546 77
547 52
        $this->send_method_frame(array($class_id, $method_id), $args);
548
549
        if ($nowait) {
550
            return null;
551
        }
552
553
        return $this->wait(array(
554
            $this->waitHelper->get_wait('queue.bind_ok')
555 77
        ));
556
    }
557 77
558
    /**
559
     * Confirms bind successful
560
     *
561
     * @param AMQPReader $reader
562
     */
563
    protected function queue_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
564
    {
565
    }
566
567
    /**
568
     * Unbind queue from an exchange
569
     *
570
     * @param string $queue
571
     * @param string $exchange
572
     * @param string $routing_key
573
     * @param array $arguments
574
     * @param int $ticket
575
     * @return mixed
576
     */
577
    public function queue_unbind(
578
        $queue,
579
        $exchange,
580
        $routing_key = '',
581
        $arguments = array(),
582
        $ticket = null
583
    ) {
584
        $ticket = $this->getTicket($ticket);
585
586
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
587
            $ticket,
588
            $queue,
589
            $exchange,
590
            $routing_key,
591
            $arguments
592
        );
593
594
        $this->send_method_frame(array($class_id, $method_id), $args);
595
596
        return $this->wait(array(
597
            $this->waitHelper->get_wait('queue.unbind_ok')
598
        ));
599
    }
600
601
    /**
602
     * Confirms unbind successful
603
     * @param AMQPReader $reader
604
     */
605
    protected function queue_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
606
    {
607
    }
608
609
    /**
610 83
     * Declares queue, creates if needed
611
     *
612
     * @param string $queue
613
     * @param bool $passive
614
     * @param bool $durable
615
     * @param bool $exclusive
616
     * @param bool $auto_delete
617
     * @param bool $nowait
618
     * @param array $arguments
619
     * @param int $ticket
620 83
     * @return mixed|null
621 83
     */
622 View Code Duplication
    public function queue_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
623 83
        $queue = '',
624 83
        $passive = false,
625 83
        $durable = false,
626 83
        $exclusive = false,
627 83
        $auto_delete = true,
628 83
        $nowait = false,
629 83
        $arguments = array(),
630 83
        $ticket = null
631 27
    ) {
632 56
        $ticket = $this->getTicket($ticket);
633
634 83
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
635
            $ticket,
636 83
            $queue,
637
            $passive,
638
            $durable,
639
            $exclusive,
640 83
            $auto_delete,
641 83
            $nowait,
642 56
            $arguments
643
        );
644
645
        $this->send_method_frame(array($class_id, $method_id), $args);
646
647
        if ($nowait) {
648
            return null;
649
        }
650
651 83
        return $this->wait(array(
652
            $this->waitHelper->get_wait('queue.declare_ok')
653 83
        ));
654 83
    }
655 83
656
    /**
657 83
     * Confirms a queue definition
658
     *
659
     * @param AMQPReader $reader
660
     * @return string[]
661
     */
662
    protected function queue_declare_ok($reader)
663
    {
664
        $queue = $reader->read_shortstr();
665
        $message_count = $reader->read_long();
666
        $consumer_count = $reader->read_long();
667
668
        return array($queue, $message_count, $consumer_count);
669
    }
670 60
671
    /**
672 60
     * Deletes a queue
673
     *
674 60
     * @param string $queue
675 60
     * @param bool $if_unused
676 60
     * @param bool $if_empty
677 60
     * @param bool $nowait
678 60
     * @param int $ticket
679 20
     * @return mixed|null
680 40
     */
681
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
682 60
    {
683
        $ticket = $this->getTicket($ticket);
684 60
685
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
686
            $ticket,
687
            $queue,
688 60
            $if_unused,
689 60
            $if_empty,
690 40
            $nowait
691
        );
692
693
        $this->send_method_frame(array($class_id, $method_id), $args);
694
695
        if ($nowait) {
696
            return null;
697
        }
698
699 60
        return $this->wait(array(
700
            $this->waitHelper->get_wait('queue.delete_ok')
701 60
        ));
702
    }
703
704
    /**
705
     * Confirms deletion of a queue
706
     *
707
     * @param AMQPReader $reader
708
     * @return string
709
     */
710
    protected function queue_delete_ok($reader)
711
    {
712
        return $reader->read_long();
713
    }
714
715
    /**
716
     * Purges a queue
717
     *
718
     * @param string $queue
719
     * @param bool $nowait
720
     * @param int $ticket
721
     * @return mixed|null
722
     */
723 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
724
    {
725
        $ticket = $this->getTicket($ticket);
726
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
727
728
        $this->send_method_frame(array($class_id, $method_id), $args);
729
730
        if ($nowait) {
731
            return null;
732
        }
733
734
        return $this->wait(array(
735
            $this->waitHelper->get_wait('queue.purge_ok')
736
        ));
737
    }
738
739
    /**
740
     * Confirms a queue purge
741
     *
742
     * @param AMQPReader $reader
743
     * @return string
744
     */
745 12
    protected function queue_purge_ok($reader)
746
    {
747 12
        return $reader->read_long();
748 12
    }
749 12
750
    /**
751
     * Acknowledges one or more messages
752
     *
753
     * @param string $delivery_tag
754
     * @param bool $multiple
755
     */
756
    public function basic_ack($delivery_tag, $multiple = false)
757 6
    {
758
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
759 6
        $this->send_method_frame(array($class_id, $method_id), $args);
760 6
    }
761
762 6
    /**
763
     * Called when the server sends a basic.ack
764
     *
765
     * @param AMQPReader $reader
766
     * @throws AMQPRuntimeException
767
     */
768 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
769 6
    {
770 6
        $delivery_tag = $reader->read_longlong();
771
        $multiple = (bool) $reader->read_bit();
772
773
        if (!isset($this->published_messages[$delivery_tag])) {
774
            throw new AMQPRuntimeException(sprintf(
775
                'Server ack\'ed unknown delivery_tag "%s"',
776
                $delivery_tag
777
            ));
778
        }
779
780
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
781
    }
782
783
    /**
784
     * Called when the server sends a basic.nack
785
     *
786
     * @param AMQPReader $reader
787
     * @throws AMQPRuntimeException
788
     */
789 View Code Duplication
    protected function basic_nack_from_server($reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
790
    {
791
        $delivery_tag = $reader->read_longlong();
792
        $multiple = (bool) $reader->read_bit();
793
794
        if (!isset($this->published_messages[$delivery_tag])) {
795
            throw new AMQPRuntimeException(sprintf(
796
                'Server nack\'ed unknown delivery_tag "%s"',
797
                $delivery_tag
798
            ));
799
        }
800 6
801
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
802 6
    }
803
804
    /**
805
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
806
     *
807
     * @param string $delivery_tag
808
     * @param bool $multiple
809
     * @param callable $handler
810 6
     */
811 6
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
812 6
    {
813
        if ($multiple) {
814 6
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
815
816
            foreach ($keys as $key) {
817
                $this->internal_ack_handler($key, false, $handler);
818
            }
819
820
        } else {
821
            $message = $this->get_and_unset_message($delivery_tag);
822
            $message->delivery_info['delivery_tag'] = $delivery_tag;
823
            $this->dispatch_to_handler($handler, array($message));
824
        }
825
    }
826
827
    /**
828
     * @param AMQPMessage[] $messages
829
     * @param string $value
830
     * @return mixed
831
     */
832
    protected function get_keys_less_or_equal(array $messages, $value)
833
    {
834
        $keys = array_reduce(
835
            array_keys($messages),
836
837
            /**
838
             * @param string $key
839
             */
840
            function ($keys, $key) use ($value) {
841
                if (bccomp($key, $value, 0) <= 0) {
842
                    $keys[] = $key;
843
                }
844
845
                return $keys;
846
            },
847
            array()
848
        );
849
850
        return $keys;
851
    }
852
853
    /**
854
     * Rejects one or several received messages
855
     *
856
     * @param string $delivery_tag
857
     * @param bool $multiple
858
     * @param bool $requeue
859
     */
860
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
861
    {
862
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
863 23
        $this->send_method_frame(array($class_id, $method_id), $args);
864
    }
865 23
866 23
    /**
867
     * Ends a queue consumer
868 23
     *
869
     * @param string $consumer_tag
870
     * @param bool $nowait
871
     * @param bool $noreturn
872
     * @return mixed
873 23
     */
874 23 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
875 16
    {
876
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
877
        $this->send_method_frame(array($class_id, $method_id), $args);
878
879
        if ($nowait || $noreturn) {
880
            unset($this->callbacks[$consumer_tag]);
881
            return $consumer_tag;
882
        }
883
884
        return $this->wait(array(
885
            $this->waitHelper->get_wait('basic.cancel_ok')
886
        ));
887
    }
888
889
    /**
890
     * @param AMQPReader $reader
891
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
892
     */
893 23
    protected function basic_cancel_from_server(AMQPReader $reader)
894
    {
895 23
        throw new AMQPBasicCancelException($reader->read_shortstr());
896 23
    }
897
898 23
    /**
899
     * Confirm a cancelled consumer
900
     *
901
     * @param AMQPReader $reader
902
     * @return string
903
     */
904
    protected function basic_cancel_ok($reader)
905
    {
906
        $consumerTag = $reader->read_shortstr();
907
        unset($this->callbacks[$consumerTag]);
908
909
        return $consumerTag;
910
    }
911
912
    /**
913
     * Starts a queue consumer
914
     *
915 23
     * @param string $queue
916
     * @param string $consumer_tag
917
     * @param bool $no_local
918
     * @param bool $no_ack
919
     * @param bool $exclusive
920
     * @param bool $nowait
921
     * @param callable|null $callback
922
     * @param int|null $ticket
923
     * @param array $arguments
924
     * @return mixed|string
925
     */
926 23
    public function basic_consume(
927 23
        $queue = '',
928 23
        $consumer_tag = '',
929 23
        $no_local = false,
930 23
        $no_ack = false,
931 23
        $exclusive = false,
932 23
        $nowait = false,
933 23
        $callback = null,
934 23
        $ticket = null,
935 23
        $arguments = array()
936 16
    ) {
937
        $ticket = $this->getTicket($ticket);
938 23
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
939
            $ticket,
940 23
            $queue,
941 23
            $consumer_tag,
942 23
            $no_local,
943 16
            $no_ack,
944 16
            $exclusive,
945
            $nowait,
946 23
            $this->protocolVersion == '0.9.1' ? $arguments : null
947
        );
948 23
949
        $this->send_method_frame(array($class_id, $method_id), $args);
950
951
        if (false === $nowait) {
952
            $consumer_tag = $this->wait(array(
953
                $this->waitHelper->get_wait('basic.consume_ok')
954
            ));
955
        }
956
957 23
        $this->callbacks[$consumer_tag] = $callback;
958
959 23
        return $consumer_tag;
960
    }
961
962
    /**
963
     * Confirms a new consumer
964
     *
965
     * @param AMQPReader $reader
966
     * @return string
967
     */
968 23
    protected function basic_consume_ok($reader)
969
    {
970 23
        return $reader->read_shortstr();
971 23
    }
972 23
973 23
    /**
974 23
     * Notifies the client of a consumer message
975
     *
976 23
     * @param AMQPReader $reader
977 23
     * @param AMQPMessage $message
978 23
     */
979 23
    protected function basic_deliver($reader, $message)
980 23
    {
981 23
        $consumer_tag = $reader->read_shortstr();
982 7
        $delivery_tag = $reader->read_longlong();
983 16
        $redelivered = $reader->read_bit();
984
        $exchange = $reader->read_shortstr();
985 23
        $routing_key = $reader->read_shortstr();
986 23
987 12
        $message->delivery_info = array(
988 17
            'channel' => $this,
989
            'consumer_tag' => $consumer_tag,
990
            'delivery_tag' => $delivery_tag,
991
            'redelivered' => $redelivered,
992
            'exchange' => $exchange,
993
            'routing_key' => $routing_key
994
        );
995
996
        if (isset($this->callbacks[$consumer_tag])) {
997
            call_user_func($this->callbacks[$consumer_tag], $message);
998 42
        }
999
    }
1000 42
1001 42
    /**
1002
     * Direct access to a queue if no message was available in the queue, return null
1003 42
     *
1004
     * @param string $queue
1005 42
     * @param bool $no_ack
1006 42
     * @param int $ticket
1007 42
     * @return mixed
1008 28
     */
1009
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
1010
    {
1011
        $ticket = $this->getTicket($ticket);
1012
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1013
1014
        $this->send_method_frame(array($class_id, $method_id), $args);
1015
1016
        return $this->wait(array(
1017
            $this->waitHelper->get_wait('basic.get_ok'),
1018
            $this->waitHelper->get_wait('basic.get_empty')
1019
        ));
1020
    }
1021
1022
    /**
1023
     * Indicates no messages available
1024
     *
1025
     * @param AMQPReader $reader
1026
     */
1027 42
    protected function basic_get_empty($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1028
    {
1029 42
    }
1030 42
1031 42
    /**
1032 42
     * Provides client with a message
1033 42
     *
1034
     * @param AMQPReader $reader
1035 42
     * @param AMQPMessage $message
1036 42
     * @return AMQPMessage
1037 42
     */
1038 42
    protected function basic_get_ok($reader, $message)
1039 42
    {
1040 14
        $delivery_tag = $reader->read_longlong();
1041 28
        $redelivered = $reader->read_bit();
1042
        $exchange = $reader->read_shortstr();
1043 42
        $routing_key = $reader->read_shortstr();
1044
        $message_count = $reader->read_long();
1045
1046
        $message->delivery_info = array(
1047
            'delivery_tag' => $delivery_tag,
1048
            'redelivered' => $redelivered,
1049
            'exchange' => $exchange,
1050
            'routing_key' => $routing_key,
1051
            'message_count' => $message_count
1052
        );
1053
1054 71
        return $message;
1055
    }
1056 71
1057 71
    /**
1058 71
     * @param string $exchange
1059 71
     * @param string $routing_key
1060 71
     * @param $mandatory
1061 71
     * @param $immediate
1062 23
     * @param int $ticket
1063 48
     * @return mixed
1064 71
     */
1065 71
    private function pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1066 71
    {
1067 71
        $cache_key = sprintf(
1068 71
            '%s|%s|%s|%s|%s',
1069 71
            $exchange,
1070 71
            $routing_key,
1071 23
            $mandatory,
1072 48
            $immediate,
1073
            $ticket
1074 71
        );
1075 71
        if (false === isset($this->publish_cache[$cache_key])) {
1076 71
            $ticket = $this->getTicket($ticket);
1077
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1078
                $ticket,
1079
                $exchange,
1080
                $routing_key,
1081 48
                $mandatory,
1082
                $immediate
1083 71
            );
1084
1085
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1086
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1087
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1088
                reset($this->publish_cache);
1089
                $old_key = key($this->publish_cache);
1090
                unset($this->publish_cache[$old_key]);
1091
            }
1092
        }
1093
1094
        return $this->publish_cache[$cache_key];
1095
    }
1096 71
1097
    /**
1098
     * Publishes a message
1099
     *
1100
     * @param AMQPMessage $msg
1101
     * @param string $exchange
1102
     * @param string $routing_key
1103
     * @param bool $mandatory
1104 71
     * @param bool $immediate
1105 71
     * @param int $ticket
1106
     */
1107 71
    public function basic_publish(
1108 71
        $msg,
1109 71
        $exchange = '',
1110 71
        $routing_key = '',
1111 71
        $mandatory = false,
1112 71
        $immediate = false,
1113 71
        $ticket = null
1114 23
    ) {
1115 48
        $pkt = new AMQPWriter();
1116
        $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1117 71
1118 6
        $this->connection->send_content(
1119 6
            $this->channel_id,
1120 4
            60,
1121 71
            0,
1122
            mb_strlen($msg->body, 'ASCII'),
1123
            $msg->serialize_properties(),
1124
            $msg->body,
1125
            $pkt
1126
        );
1127
1128 View Code Duplication
        if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1129
            $this->published_messages[$this->next_delivery_tag] = $msg;
1130
            $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1131
        }
1132
    }
1133
1134
    /**
1135
     * @param AMQPMessage $msg
1136
     * @param string $exchange
1137
     * @param string $routing_key
1138
     * @param bool $mandatory
1139
     * @param bool $immediate
1140
     * @param int $ticket
1141
     */
1142
    public function batch_basic_publish(
1143
        $msg,
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1144
        $exchange = '',
0 ignored issues
show
Unused Code introduced by
The parameter $exchange is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1145
        $routing_key = '',
0 ignored issues
show
Unused Code introduced by
The parameter $routing_key is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1146
        $mandatory = false,
0 ignored issues
show
Unused Code introduced by
The parameter $mandatory is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1147
        $immediate = false,
0 ignored issues
show
Unused Code introduced by
The parameter $immediate is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1148
        $ticket = null
0 ignored issues
show
Unused Code introduced by
The parameter $ticket is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1149
    ) {
1150
        $this->batch_messages[] = func_get_args();
1151
    }
1152
1153
    /**
1154
     * Publish batch
1155
     *
1156
     * @return void
1157
     */
1158
    public function publish_batch()
1159
    {
1160
        if (empty($this->batch_messages)) {
1161
            return null;
1162
        }
1163
1164
        /** @var AMQPWriter $pkt */
1165
        $pkt = new AMQPWriter();
1166
1167
        /** @var AMQPMessage $msg */
1168
        foreach ($this->batch_messages as $m) {
1169
            $msg = $m[0];
1170
1171
            $exchange = isset($m[1]) ? $m[1] : '';
1172
            $routing_key = isset($m[2]) ? $m[2] : '';
1173
            $mandatory = isset($m[3]) ? $m[3] : false;
1174
            $immediate = isset($m[4]) ? $m[4] : false;
1175
            $ticket = isset($m[5]) ? $m[5] : null;
1176
            $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1177
1178
            $this->connection->prepare_content(
1179
                $this->channel_id,
1180
                60,
1181
                0,
1182
                mb_strlen($msg->body, 'ASCII'),
1183
                $msg->serialize_properties(),
1184
                $msg->body,
1185
                $pkt
1186
            );
1187
1188 View Code Duplication
            if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1189
                $this->published_messages[$this->next_delivery_tag] = $msg;
1190
                $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1191
            }
1192
        }
1193
1194
        //call write here
1195
        $this->connection->write($pkt->getvalue());
1196
        $this->batch_messages = array();
1197
    }
1198
1199
    /**
1200
     * Specifies QoS
1201
     *
1202
     * @param int $prefetch_size
1203
     * @param int $prefetch_count
1204
     * @param bool $a_global
1205
     * @return mixed
1206
     */
1207
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
1208
    {
1209
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1210
            $prefetch_size,
1211
            $prefetch_count,
1212
            $a_global
1213
        );
1214
1215
        $this->send_method_frame(array($class_id, $method_id), $args);
1216
1217
        return $this->wait(array(
1218
            $this->waitHelper->get_wait('basic.qos_ok')
1219
        ));
1220
    }
1221
1222
    /**
1223
     * Confirms QoS request
1224
     * @param AMQPReader $reader
1225
     */
1226
    protected function basic_qos_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1227
    {
1228
    }
1229
1230
    /**
1231
     * Redelivers unacknowledged messages
1232
     *
1233
     * @param bool $requeue
1234
     * @return mixed
1235
     */
1236
    public function basic_recover($requeue = false)
1237
    {
1238
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1239
        $this->send_method_frame(array($class_id, $method_id), $args);
1240
1241
        return $this->wait(array(
1242
            $this->waitHelper->get_wait('basic.recover_ok')
1243
        ));
1244
    }
1245
1246
    /**
1247
     * Confirm the requested recover
1248
     * @param AMQPReader $reader
1249
     */
1250
    protected function basic_recover_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1251
    {
1252
    }
1253
1254
    /**
1255
     * Rejects an incoming message
1256
     *
1257
     * @param string $delivery_tag
1258
     * @param bool $requeue
1259
     */
1260
    public function basic_reject($delivery_tag, $requeue)
1261
    {
1262
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1263
        $this->send_method_frame(array($class_id, $method_id), $args);
1264
    }
1265
1266
    /**
1267
     * Returns a failed message
1268
     *
1269
     * @param AMQPReader $reader
1270
     * @param AMQPMessage $message
1271
     * @return null
1272
     */
1273
    protected function basic_return($reader, $message)
1274
    {
1275
        $callback = $this->basic_return_callback;
1276
        if (!is_callable($callback)) {
1277
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1278
            return null;
1279
        }
1280
1281
        $reply_code = $reader->read_short();
1282
        $reply_text = $reader->read_shortstr();
1283
        $exchange = $reader->read_shortstr();
1284
        $routing_key = $reader->read_shortstr();
1285
1286
        call_user_func_array($callback, array(
1287
            $reply_code,
1288
            $reply_text,
1289
            $exchange,
1290
            $routing_key,
1291
            $message,
1292
        ));
1293
    }
1294
1295
    /**
1296
     * @return mixed
1297
     */
1298
    public function tx_commit()
1299
    {
1300
        $this->send_method_frame(array(90, 20));
1301
1302
        return $this->wait(array(
1303
            $this->waitHelper->get_wait('tx.commit_ok')
1304
        ));
1305
    }
1306
1307
    /**
1308
     * Confirms a successful commit
1309
     * @param AMQPReader $reader
1310
     */
1311
    protected function tx_commit_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1312
    {
1313
    }
1314
1315
    /**
1316
     * Rollbacks the current transaction
1317
     *
1318
     * @return mixed
1319
     */
1320
    public function tx_rollback()
1321
    {
1322
        $this->send_method_frame(array(90, 30));
1323
1324
        return $this->wait(array(
1325
            $this->waitHelper->get_wait('tx.rollback_ok')
1326
        ));
1327
    }
1328
1329
    /**
1330
     * Confirms a successful rollback
1331
     *
1332
     * @param AMQPReader $reader
1333
     */
1334 6
    protected function tx_rollback_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1335
    {
1336 6
    }
1337
1338 6
    /**
1339
     * Puts the channel into confirm mode
1340 6
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1341
     *
1342
     * @param bool $nowait
1343
     * @return null
1344 6
     */
1345 6 View Code Duplication
    public function confirm_select($nowait = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1346 6
    {
1347
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
1348
1349
        $this->send_method_frame(array($class_id, $method_id), $args);
1350
1351
        if ($nowait) {
1352
            return null;
1353 6
        }
1354
1355 6
        $this->wait(array($this->waitHelper->get_wait('confirm.select_ok')));
1356
        $this->next_delivery_tag = 1;
1357
    }
1358
1359
    /**
1360
     * Confirms a selection
1361
     *
1362
     * @param AMQPReader $reader
1363
     */
1364
    public function confirm_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1365
    {
1366
    }
1367
1368
    /**
1369
     * Waits for pending acks and nacks from the server.
1370
     * If there are no pending acks, the method returns immediately
1371
     *
1372
     * @param int $timeout Waits until $timeout value is reached
1373
     */
1374 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1375
    {
1376
        $functions = array(
1377
            $this->waitHelper->get_wait('basic.ack'),
1378
            $this->waitHelper->get_wait('basic.nack'),
1379
        );
1380
1381
        while (count($this->published_messages) !== 0) {
1382
            if ($timeout > 0) {
1383
                $this->wait($functions, true, $timeout);
1384 6
            } else {
1385
                $this->wait($functions);
1386
            }
1387 6
        }
1388 6
    }
1389 6
1390 4
    /**
1391
     * Waits for pending acks, nacks and returns from the server. If there are no pending acks, the method returns immediately.
1392 6
     *
1393 6
     * @param int $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1394 6
     */
1395 4 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1396
    {
1397
        $functions = array(
1398 4
            $this->waitHelper->get_wait('basic.ack'),
1399 6
            $this->waitHelper->get_wait('basic.nack'),
1400
            $this->waitHelper->get_wait('basic.return'),
1401
        );
1402
1403
        while (count($this->published_messages) !== 0) {
1404
            if ($timeout > 0) {
1405
                $this->wait($functions, true, $timeout);
1406
            } else {
1407
                $this->wait($functions);
1408
            }
1409
        }
1410
    }
1411
1412
    /**
1413
     * Selects standard transaction mode
1414
     *
1415
     * @return mixed
1416
     */
1417
    public function tx_select()
1418
    {
1419
        $this->send_method_frame(array(90, 10));
1420
1421
        return $this->wait(array(
1422
            $this->waitHelper->get_wait('tx.select_ok')
1423
        ));
1424
    }
1425
1426
    /**
1427 84
     * Confirms transaction mode
1428
     * @param AMQPReader $reader
1429 84
     */
1430
    protected function tx_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1431
    {
1432
    }
1433
1434
    /**
1435
     * @param array $arguments
1436 84
     * @return array
1437
     */
1438 84
    protected function getArguments($arguments)
1439
    {
1440
        @trigger_error(sprintf(
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition here. This can introduce security issues, and is generally not recommended.

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
1441
            'Method "%s" is deprecated, please use an array as a default argument instead',
1442
            __METHOD__
1443
        ), E_USER_DEPRECATED);
1444
        return (null === $arguments) ? array() : $arguments;
1445
    }
1446
1447 6
    /**
1448
     * @param int $ticket
1449 6
     * @return int
1450 6
     */
1451
    protected function getTicket($ticket)
1452 6
    {
1453
        return (null === $ticket) ? $this->default_ticket : $ticket;
1454
    }
1455
1456
    /**
1457
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1458
     *
1459
     * @param int $index
1460
     * @return AMQPMessage
1461
     */
1462
    protected function get_and_unset_message($index)
1463
    {
1464
        $message = $this->published_messages[$index];
1465
        unset($this->published_messages[$index]);
1466
1467
        return $message;
1468
    }
1469
1470
    /**
1471
     * Sets callback for basic_return
1472
     *
1473
     * @param  callable $callback
1474
     * @throws \InvalidArgumentException if $callback is not callable
1475
     */
1476 View Code Duplication
    public function set_return_listener($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1477
    {
1478
        if (!is_callable($callback)) {
1479
            throw new \InvalidArgumentException(sprintf(
1480
                'Given callback "%s" should be callable. %s type was given.',
1481
                $callback,
1482
                gettype($callback)
1483
            ));
1484
        }
1485
1486
        $this->basic_return_callback = $callback;
1487
    }
1488
1489
    /**
1490
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1491
     *
1492
     * @param callable $callback
1493
     * @throws \InvalidArgumentException
1494
     */
1495 View Code Duplication
    public function set_nack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1496
    {
1497
        if (!is_callable($callback)) {
1498
            throw new \InvalidArgumentException(sprintf(
1499 6
                'Given callback "%s" should be callable. %s type was given.',
1500
                $callback,
1501 6
                gettype($callback)
1502
            ));
1503
        }
1504
1505
        $this->nack_handler = $callback;
1506
    }
1507
1508
    /**
1509 6
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1510 6
     *
1511
     * @param callable $callback
1512
     * @throws \InvalidArgumentException
1513
     */
1514 View Code Duplication
    public function set_ack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1515
    {
1516
        if (!is_callable($callback)) {
1517
            throw new \InvalidArgumentException(sprintf(
1518
                'Given callback "%s" should be callable. %s type was given.',
1519
                $callback,
1520
                gettype($callback)
1521
            ));
1522
        }
1523
1524
        $this->ack_handler = $callback;
1525
    }
1526
}
1527