Completed
Push — master ( 9b90e7...a2b649 )
by John
17:01 queued 14:42
created

PhpAmqpLib/Channel/AMQPChannel.php (23 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Exception\AMQPBasicCancelException;
5
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPReader;
9
use PhpAmqpLib\Wire\AMQPWriter;
10
11
class AMQPChannel extends AbstractChannel
12
{
13
    /** @var array */
14
    public $callbacks = array();
15
16
    /** @var bool Whether or not the channel has been "opened" */
17
    protected $is_open = false;
18
19
    /** @var int */
20
    protected $default_ticket;
21
22
    /** @var bool */
23
    protected $active;
24
25
    /** @var array */
26
    protected $alerts;
27
28
    /** @var bool */
29
    protected $auto_decode;
30
31
    /**
32
     * These parameters will be passed to function in case of basic_return:
33
     *    param int $reply_code
34
     *    param string $reply_text
35
     *    param string $exchange
36
     *    param string $routing_key
37
     *    param AMQPMessage $msg
38
     *
39
     * @var callable
40
     */
41
    protected $basic_return_callback;
42
43
    /** @var array Used to keep track of the messages that are going to be batch published. */
44
    protected $batch_messages = array();
45
46
    /**
47
     * If the channel is in confirm_publish mode this array will store all published messages
48
     * until they get ack'ed or nack'ed
49
     *
50
     * @var AMQPMessage[]
51
     */
52
    private $published_messages = array();
53
54
    /** @var int */
55
    private $next_delivery_tag = 0;
56
57
    /** @var callable */
58
    private $ack_handler;
59
60
    /** @var callable */
61
    private $nack_handler;
62
63
    /**
64
     * Circular buffer to speed up both basic_publish() and publish_batch().
65
     * Max size limited by $publish_cache_max_size.
66
     *
67
     * @var array
68
     * @see basic_publish()
69
     * @see publish_batch()
70
     */
71
    private $publish_cache;
72
73
    /**
74
     * Maximal size of $publish_cache
75
     *
76
     * @var int
77
     */
78
    private $publish_cache_max_size;
79
80
    /**
81
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
82
     * @param null $channel_id
83
     * @param bool $auto_decode
84
     * @throws \Exception
85
     */
86 48
    public function __construct($connection, $channel_id = null, $auto_decode = true)
87
    {
88 48
        if ($channel_id == null) {
89
            $channel_id = $connection->get_free_channel_id();
90
        }
91
92 48
        parent::__construct($connection, $channel_id);
93
94 48
        $this->publish_cache = array();
95 48
        $this->publish_cache_max_size = 100;
96
97 48
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
98
99 48
        $this->default_ticket = 0;
100 48
        $this->is_open = false;
101 48
        $this->active = true; // Flow control
102 48
        $this->alerts = array();
103 48
        $this->callbacks = array();
104 48
        $this->auto_decode = $auto_decode;
105
106
        try {
107 48
            $this->x_open();
108 32
        } catch (\Exception $e) {
109
            $this->close();
110
            throw $e;
111
        }
112 48
    }
113
114
    /**
115
     * Tear down this object, after we've agreed to close with the server.
116
     */
117 54
    protected function do_close()
118
    {
119 54
        if ($this->channel_id !== null) {
120 48
            unset($this->connection->channels[$this->channel_id]);
121 32
        }
122 54
        $this->channel_id = $this->connection = null;
123 54
        $this->is_open = false;
124 54
    }
125
126
    /**
127
     * Only for AMQP0.8.0
128
     * This method allows the server to send a non-fatal warning to
129
     * the client.  This is used for methods that are normally
130
     * asynchronous and thus do not have confirmations, and for which
131
     * the server may detect errors that need to be reported.  Fatal
132
     * errors are handled as channel or connection exceptions; non-
133
     * fatal errors are sent through this method.
134
     *
135
     * @param AMQPReader $reader
136
     */
137
    protected function channel_alert($reader)
138
    {
139
        $reply_code = $reader->read_short();
140
        $reply_text = $reader->read_shortstr();
141
        $details = $reader->read_table();
142
        array_push($this->alerts, array($reply_code, $reply_text, $details));
143
    }
144
145
    /**
146
     * Request a channel close
147
     *
148
     * @param int $reply_code
149
     * @param string $reply_text
150
     * @param array $method_sig
151
     * @return mixed
152
     */
153 54
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
154
    {
155 54
        if ($this->is_open === false || $this->connection === null) {
156 12
            $this->do_close();
157
158 12
            return null; // already closed
159
        }
160 48
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
161 32
            $reply_code,
162 32
            $reply_text,
163 48
            $method_sig[0],
164 48
            $method_sig[1]
165 32
        );
166
167 48
        $this->send_method_frame(array($class_id, $method_id), $args);
168
169 48
        return $this->wait(array(
170 48
            $this->waitHelper->get_wait('channel.close_ok')
171 32
        ));
172
    }
173
174
    /**
175
     * @param AMQPReader $reader
176
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
177
     */
178 6
    protected function channel_close($reader)
179
    {
180 6
        $reply_code = $reader->read_short();
181 6
        $reply_text = $reader->read_shortstr();
182 6
        $class_id = $reader->read_short();
183 6
        $method_id = $reader->read_short();
184
185 6
        $this->send_method_frame(array(20, 41));
186 6
        $this->do_close();
187
188 6
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
189
    }
190
191
    /**
192
     * Confirm a channel close
193
     * Alias of AMQPChannel::do_close()
194
     *
195
     * @param AMQPReader $reader
196
     */
197 48
    protected function channel_close_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
198
    {
199 48
        $this->do_close();
200 48
    }
201
202
    /**
203
     * Enables/disables flow from peer
204
     *
205
     * @param $active
206
     * @return mixed
207
     */
208
    public function flow($active)
209
    {
210
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
211
        $this->send_method_frame(array($class_id, $method_id), $args);
212
213
        return $this->wait(array(
214
            $this->waitHelper->get_wait('channel.flow_ok')
215
        ));
216
    }
217
218
    /**
219
     * @param AMQPReader $reader
220
     */
221
    protected function channel_flow($reader)
222
    {
223
        $this->active = $reader->read_bit();
224
        $this->x_flow_ok($this->active);
225
    }
226
227
    /**
228
     * @param bool $active
229
     */
230
    protected function x_flow_ok($active)
231
    {
232
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
233
        $this->send_method_frame(array($class_id, $method_id), $args);
234
    }
235
236
    /**
237
     * @param AMQPReader $reader
238
     * @return bool
239
     */
240
    protected function channel_flow_ok($reader)
241
    {
242
        return $reader->read_bit();
243
    }
244
245
    /**
246
     * @param string $out_of_band
247
     * @return mixed
248
     */
249 48 View Code Duplication
    protected function x_open($out_of_band = '')
250
    {
251 48
        if ($this->is_open) {
252
            return null;
253
        }
254
255 48
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
256 48
        $this->send_method_frame(array($class_id, $method_id), $args);
257
258 48
        return $this->wait(array(
259 48
            $this->waitHelper->get_wait('channel.open_ok')
260 32
        ));
261
    }
262
263
    /**
264
     * @param AMQPReader $reader
265
     */
266 48
    protected function channel_open_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
267
    {
268 48
        $this->is_open = true;
269
270 48
        $this->debug->debug_msg('Channel open');
271 48
    }
272
273
    /**
274
     * Requests an access ticket
275
     *
276
     * @param string $realm
277
     * @param bool $exclusive
278
     * @param bool $passive
279
     * @param bool $active
280
     * @param bool $write
281
     * @param bool $read
282
     * @return mixed
283
     */
284
    public function access_request(
285
        $realm,
286
        $exclusive = false,
287
        $passive = false,
288
        $active = false,
289
        $write = false,
290
        $read = false
291
    ) {
292
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
293
            $realm,
294
            $exclusive,
295
            $passive,
296
            $active,
297
            $write,
298
            $read
299
        );
300
301
        $this->send_method_frame(array($class_id, $method_id), $args);
302
303
        return $this->wait(array(
304
            $this->waitHelper->get_wait('access.request_ok')
305
        ));
306
    }
307
308
    /**
309
     * Grants access to server resources
310
     *
311
     * @param AMQPReader $reader
312
     * @return string
313
     */
314
    protected function access_request_ok($reader)
315
    {
316
        $this->default_ticket = $reader->read_short();
317
318
        return $this->default_ticket;
319
    }
320
321
    /**
322
     * Declares exchange
323
     *
324
     * @param string $exchange
325
     * @param string $type
326
     * @param bool $passive
327
     * @param bool $durable
328
     * @param bool $auto_delete
329
     * @param bool $internal
330
     * @param bool $nowait
331
     * @param array $arguments
332
     * @param int $ticket
333
     * @return mixed|null
334
     */
335 42 View Code Duplication
    public function exchange_declare(
336
        $exchange,
337
        $type,
338
        $passive = false,
339
        $durable = false,
340
        $auto_delete = true,
341
        $internal = false,
342
        $nowait = false,
343
        $arguments = null,
344
        $ticket = null
345
    ) {
346 42
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
347 42
        $ticket = $this->getTicket($ticket);
348
349 42
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
350 28
            $ticket,
351 28
            $exchange,
352 28
            $type,
353 28
            $passive,
354 28
            $durable,
355 28
            $auto_delete,
356 28
            $internal,
357 28
            $nowait,
358
            $arguments
359 28
        );
360
361 42
        $this->send_method_frame(array($class_id, $method_id), $args);
362
363 42
        if ($nowait) {
364
            return null;
365
        }
366
367 42
        return $this->wait(array(
368 42
            $this->waitHelper->get_wait('exchange.declare_ok')
369 28
        ));
370
    }
371
372
    /**
373
     * Confirms an exchange declaration
374
     * @param AMQPReader $reader
375
     */
376 42
    protected function exchange_declare_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
377
    {
378 42
    }
379
380
    /**
381
     * Deletes an exchange
382
     *
383
     * @param string $exchange
384
     * @param bool $if_unused
385
     * @param bool $nowait
386
     * @param null $ticket
387
     * @return mixed|null
388
     */
389 42 View Code Duplication
    public function exchange_delete(
390
        $exchange,
391
        $if_unused = false,
392
        $nowait = false,
393
        $ticket = null
394
    ) {
395 42
        $ticket = $this->getTicket($ticket);
396 42
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
397 28
            $ticket,
398 28
            $exchange,
399 28
            $if_unused,
400
            $nowait
401 28
        );
402
403 42
        $this->send_method_frame(array($class_id, $method_id), $args);
404
405 42
        if ($nowait) {
406
            return null;
407
        }
408
409 42
        return $this->wait(array(
410 42
            $this->waitHelper->get_wait('exchange.delete_ok')
411 28
        ));
412
    }
413
414
    /**
415
     * Confirms deletion of an exchange
416
     *
417
     * @param AMQPReader $reader
418
     */
419 42
    protected function exchange_delete_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
420
    {
421 42
    }
422
423
    /**
424
     * Binds dest exchange to source exchange
425
     *
426
     * @param string $destination
427
     * @param string $source
428
     * @param string $routing_key
429
     * @param bool $nowait
430
     * @param array $arguments
431
     * @param int $ticket
432
     * @return mixed|null
433
     */
434 View Code Duplication
    public function exchange_bind(
435
        $destination,
436
        $source,
437
        $routing_key = '',
438
        $nowait = false,
439
        $arguments = null,
440
        $ticket = null
441
    ) {
442
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
443
        $ticket = $this->getTicket($ticket);
444
445
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
446
            $ticket,
447
            $destination,
448
            $source,
449
            $routing_key,
450
            $nowait,
451
            $arguments
452
        );
453
454
        $this->send_method_frame(array($class_id, $method_id), $args);
455
456
        if ($nowait) {
457
            return null;
458
        }
459
460
        return $this->wait(array(
461
            $this->waitHelper->get_wait('exchange.bind_ok')
462
        ));
463
    }
464
465
    /**
466
     * Confirms bind successful
467
     * @param AMQPReader $reader
468
     */
469
    protected function exchange_bind_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
470
    {
471
    }
472
473
    /**
474
     * Unbinds dest exchange from source exchange
475
     *
476
     * @param string $destination
477
     * @param string $source
478
     * @param string $routing_key
479
     * @param array $arguments
480
     * @param int $ticket
481
     * @return mixed
482
     */
483 View Code Duplication
    public function exchange_unbind($destination, $source, $routing_key = '', $arguments = null, $ticket = null)
484
    {
485
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
486
        $ticket = $this->getTicket($ticket);
487
488
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
489
            $ticket,
490
            $destination,
491
            $source,
492
            $routing_key,
493
            $arguments
494
        );
495
496
        $this->send_method_frame(array($class_id, $method_id), $args);
497
498
        return $this->wait(array(
499
            $this->waitHelper->get_wait('exchange.unbind_ok')
500
        ));
501
    }
502
503
    /**
504
     * Confirms unbind successful
505
     *
506
     * @param AMQPReader $reader
507
     */
508
    protected function exchange_unbind_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
509
    {
510
    }
511
512
    /**
513
     * Binds queue to an exchange
514
     *
515
     * @param string $queue
516
     * @param string $exchange
517
     * @param string $routing_key
518
     * @param bool $nowait
519
     * @param array $arguments
520
     * @param int $ticket
521
     * @return mixed|null
522
     */
523 42 View Code Duplication
    public function queue_bind($queue, $exchange, $routing_key = '', $nowait = false, $arguments = null, $ticket = null)
524
    {
525 42
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
526 42
        $ticket = $this->getTicket($ticket);
527
528 42
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
529 28
            $ticket,
530 28
            $queue,
531 28
            $exchange,
532 28
            $routing_key,
533 28
            $nowait,
534
            $arguments
535 28
        );
536
537 42
        $this->send_method_frame(array($class_id, $method_id), $args);
538
539 42
        if ($nowait) {
540
            return null;
541
        }
542
543 42
        return $this->wait(array(
544 42
            $this->waitHelper->get_wait('queue.bind_ok')
545 28
        ));
546
    }
547
548
    /**
549
     * Confirms bind successful
550
     *
551
     * @param AMQPReader $reader
552
     */
553 42
    protected function queue_bind_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
554
    {
555 42
    }
556
557
    /**
558
     * Unbind queue from an exchange
559
     *
560
     * @param string $queue
561
     * @param string $exchange
562
     * @param string $routing_key
563
     * @param array $arguments
564
     * @param int $ticket
565
     * @return mixed
566
     */
567 View Code Duplication
    public function queue_unbind($queue, $exchange, $routing_key = '', $arguments = null, $ticket = null)
568
    {
569
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
570
        $ticket = $this->getTicket($ticket);
571
572
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
573
            $ticket,
574
            $queue,
575
            $exchange,
576
            $routing_key,
577
            $arguments
578
        );
579
580
        $this->send_method_frame(array($class_id, $method_id), $args);
581
582
        return $this->wait(array(
583
            $this->waitHelper->get_wait('queue.unbind_ok')
584
        ));
585
    }
586
587
    /**
588
     * Confirms unbind successful
589
     * @param AMQPReader $reader
590
     */
591
    protected function queue_unbind_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
592
    {
593
    }
594
595
    /**
596
     * Declares queue, creates if needed
597
     *
598
     * @param string $queue
599
     * @param bool $passive
600
     * @param bool $durable
601
     * @param bool $exclusive
602
     * @param bool $auto_delete
603
     * @param bool $nowait
604
     * @param array $arguments
605
     * @param int $ticket
606
     * @return mixed|null
607
     */
608 48 View Code Duplication
    public function queue_declare(
609
        $queue = '',
610
        $passive = false,
611
        $durable = false,
612
        $exclusive = false,
613
        $auto_delete = true,
614
        $nowait = false,
615
        $arguments = null,
616
        $ticket = null
617
    ) {
618 48
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
619 48
        $ticket = $this->getTicket($ticket);
620
621 48
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
622 32
            $ticket,
623 32
            $queue,
624 32
            $passive,
625 32
            $durable,
626 32
            $exclusive,
627 32
            $auto_delete,
628 32
            $nowait,
629
            $arguments
630 32
        );
631
632 48
        $this->send_method_frame(array($class_id, $method_id), $args);
633
634 48
        if ($nowait) {
635
            return null;
636
        }
637
638 48
        return $this->wait(array(
639 48
            $this->waitHelper->get_wait('queue.declare_ok')
640 32
        ));
641
    }
642
643
    /**
644
     * Confirms a queue definition
645
     *
646
     * @param AMQPReader $reader
647
     * @return string[]
648
     */
649 48
    protected function queue_declare_ok($reader)
650
    {
651 48
        $queue = $reader->read_shortstr();
652 48
        $message_count = $reader->read_long();
653 48
        $consumer_count = $reader->read_long();
654
655 48
        return array($queue, $message_count, $consumer_count);
656
    }
657
658
    /**
659
     * Deletes a queue
660
     *
661
     * @param string $queue
662
     * @param bool $if_unused
663
     * @param bool $if_empty
664
     * @param bool $nowait
665
     * @param int $ticket
666
     * @return mixed|null
667
     */
668 24
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
669
    {
670 24
        $ticket = $this->getTicket($ticket);
671
672 24
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
673 16
            $ticket,
674 16
            $queue,
675 16
            $if_unused,
676 16
            $if_empty,
677
            $nowait
678 16
        );
679
680 24
        $this->send_method_frame(array($class_id, $method_id), $args);
681
682 24
        if ($nowait) {
683
            return null;
684
        }
685
686 24
        return $this->wait(array(
687 24
            $this->waitHelper->get_wait('queue.delete_ok')
688 16
        ));
689
    }
690
691
    /**
692
     * Confirms deletion of a queue
693
     *
694
     * @param AMQPReader $reader
695
     * @return string
696
     */
697 24
    protected function queue_delete_ok($reader)
698
    {
699 24
        return $reader->read_long();
700
    }
701
702
    /**
703
     * Purges a queue
704
     *
705
     * @param string $queue
706
     * @param bool $nowait
707
     * @param int $ticket
708
     * @return mixed|null
709
     */
710 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
711
    {
712
        $ticket = $this->getTicket($ticket);
713
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
714
715
        $this->send_method_frame(array($class_id, $method_id), $args);
716
717
        if ($nowait) {
718
            return null;
719
        }
720
721
        return $this->wait(array(
722
            $this->waitHelper->get_wait('queue.purge_ok')
723
        ));
724
    }
725
726
    /**
727
     * Confirms a queue purge
728
     *
729
     * @param AMQPReader $reader
730
     * @return string
731
     */
732
    protected function queue_purge_ok($reader)
733
    {
734
        return $reader->read_long();
735
    }
736
737
    /**
738
     * Acknowledges one or more messages
739
     *
740
     * @param string $delivery_tag
741
     * @param bool $multiple
742
     */
743 6
    public function basic_ack($delivery_tag, $multiple = false)
744
    {
745 6
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
746 6
        $this->send_method_frame(array($class_id, $method_id), $args);
747 6
    }
748
749
    /**
750
     * Called when the server sends a basic.ack
751
     *
752
     * @param AMQPReader $reader
753
     * @throws AMQPRuntimeException
754
     */
755 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $reader)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
756
    {
757
        $delivery_tag = $reader->read_longlong();
758
        $multiple = (bool) $reader->read_bit();
759
760
        if (!isset($this->published_messages[$delivery_tag])) {
761
            throw new AMQPRuntimeException(sprintf(
762
                'Server ack\'ed unknown delivery_tag "%s"',
763
                $delivery_tag
764
            ));
765
        }
766
767
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
768
    }
769
770
    /**
771
     * Called when the server sends a basic.nack
772
     *
773
     * @param AMQPReader $reader
774
     * @throws AMQPRuntimeException
775
     */
776 View Code Duplication
    protected function basic_nack_from_server($reader)
0 ignored issues
show
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
777
    {
778
        $delivery_tag = $reader->read_longlong();
779
        $multiple = (bool) $reader->read_bit();
780
781
        if (!isset($this->published_messages[$delivery_tag])) {
782
            throw new AMQPRuntimeException(sprintf(
783
                'Server nack\'ed unknown delivery_tag "%s"',
784
                $delivery_tag
785
            ));
786
        }
787
788
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
789
    }
790
791
    /**
792
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
793
     *
794
     * @param string $delivery_tag
795
     * @param bool $multiple
796
     * @param callable $handler
797
     */
798
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
799
    {
800
        if ($multiple) {
801
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
802
803
            foreach ($keys as $key) {
804
                $this->internal_ack_handler($key, false, $handler);
805
            }
806
807
        } else {
808
            $message = $this->get_and_unset_message($delivery_tag);
809
            $this->dispatch_to_handler($handler, array($message));
810
        }
811
    }
812
813
    /**
814
     * @param AMQPMessage[] $messages
815
     * @param string $value
816
     * @return mixed
817
     */
818
    protected function get_keys_less_or_equal(array $messages, $value)
819
    {
820
        $keys = array_reduce(
821
            array_keys($messages),
822
823
            /**
824
             * @param string $key
825
             */
826
            function ($keys, $key) use ($value) {
827
                if (bccomp($key, $value, 0) <= 0) {
828
                    $keys[] = $key;
829
                }
830
831
                return $keys;
832
            },
833
            array()
834
        );
835
836
        return $keys;
837
    }
838
839
    /**
840
     * Rejects one or several received messages
841
     *
842
     * @param string $delivery_tag
843
     * @param bool $multiple
844
     * @param bool $requeue
845
     */
846
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
847
    {
848
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
849
        $this->send_method_frame(array($class_id, $method_id), $args);
850
    }
851
852
    /**
853
     * Ends a queue consumer
854
     *
855
     * @param string $consumer_tag
856
     * @param bool $nowait
857
     * @param bool $noreturn
858
     * @return mixed
859
     */
860 18 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
861
    {
862 18
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
863 18
        $this->send_method_frame(array($class_id, $method_id), $args);
864
865 18
        if ($nowait || $noreturn) {
866
            unset($this->callbacks[$consumer_tag]);
867
            return $consumer_tag;
868
        }
869
870 18
        return $this->wait(array(
871 18
            $this->waitHelper->get_wait('basic.cancel_ok')
872 12
        ));
873
    }
874
875
    /**
876
     * @param AMQPReader $reader
877
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
878
     */
879
    protected function basic_cancel_from_server(AMQPReader $reader)
880
    {
881
        throw new AMQPBasicCancelException($reader->read_shortstr());
882
    }
883
884
    /**
885
     * Confirm a cancelled consumer
886
     *
887
     * @param AMQPReader $reader
888
     * @return string
889
     */
890 18
    protected function basic_cancel_ok($reader)
891
    {
892 18
        $consumerTag = $reader->read_shortstr();
893 18
        unset($this->callbacks[$consumerTag]);
894
895 18
        return $consumerTag;
896
    }
897
898
    /**
899
     * Starts a queue consumer
900
     *
901
     * @param string $queue
902
     * @param string $consumer_tag
903
     * @param bool $no_local
904
     * @param bool $no_ack
905
     * @param bool $exclusive
906
     * @param bool $nowait
907
     * @param callback|null $callback
908
     * @param int|null $ticket
909
     * @param array $arguments
910
     * @return mixed|string
911
     */
912 18
    public function basic_consume(
913
        $queue = '',
914
        $consumer_tag = '',
915
        $no_local = false,
916
        $no_ack = false,
917
        $exclusive = false,
918
        $nowait = false,
919
        $callback = null,
920
        $ticket = null,
921
        $arguments = array()
922
    ) {
923 18
        $ticket = $this->getTicket($ticket);
924 18
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
925 12
            $ticket,
926 12
            $queue,
927 12
            $consumer_tag,
928 12
            $no_local,
929 12
            $no_ack,
930 12
            $exclusive,
931 12
            $nowait,
932 18
            $this->protocolVersion == '0.9.1' ? $arguments : null
933 12
        );
934
935 18
        $this->send_method_frame(array($class_id, $method_id), $args);
936
937 18
        if (false === $nowait) {
938 18
            $consumer_tag = $this->wait(array(
939 18
                $this->waitHelper->get_wait('basic.consume_ok')
940 12
            ));
941 12
        }
942
943 18
        $this->callbacks[$consumer_tag] = $callback;
944
945 18
        return $consumer_tag;
946
    }
947
948
    /**
949
     * Confirms a new consumer
950
     *
951
     * @param AMQPReader $reader
952
     * @return string
953
     */
954 18
    protected function basic_consume_ok($reader)
955
    {
956 18
        return $reader->read_shortstr();
957
    }
958
959
    /**
960
     * Notifies the client of a consumer message
961
     *
962
     * @param AMQPReader $reader
963
     * @param AMQPMessage $message
964
     */
965 18
    protected function basic_deliver($reader, $message)
966
    {
967 18
        $consumer_tag = $reader->read_shortstr();
968 18
        $delivery_tag = $reader->read_longlong();
969 18
        $redelivered = $reader->read_bit();
970 18
        $exchange = $reader->read_shortstr();
971 18
        $routing_key = $reader->read_shortstr();
972
973 18
        $message->delivery_info = array(
974 18
            'channel' => $this,
975 18
            'consumer_tag' => $consumer_tag,
976 18
            'delivery_tag' => $delivery_tag,
977 18
            'redelivered' => $redelivered,
978 18
            'exchange' => $exchange,
979 6
            'routing_key' => $routing_key
980 12
        );
981
982 18
        if (isset($this->callbacks[$consumer_tag])) {
983 18
            call_user_func($this->callbacks[$consumer_tag], $message);
984 12
        }
985 18
    }
986
987
    /**
988
     * Direct access to a queue if no message was available in the queue, return null
989
     *
990
     * @param string $queue
991
     * @param bool $no_ack
992
     * @param int $ticket
993
     * @return mixed
994
     */
995 24
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
996
    {
997 24
        $ticket = $this->getTicket($ticket);
998 24
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
999
1000 24
        $this->send_method_frame(array($class_id, $method_id), $args);
1001
1002 24
        return $this->wait(array(
1003 24
            $this->waitHelper->get_wait('basic.get_ok'),
1004 24
            $this->waitHelper->get_wait('basic.get_empty')
1005 16
        ));
1006
    }
1007
1008
    /**
1009
     * Indicates no messages available
1010
     *
1011
     * @param AMQPReader $reader
1012
     */
1013
    protected function basic_get_empty($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1014
    {
1015
    }
1016
1017
    /**
1018
     * Provides client with a message
1019
     *
1020
     * @param AMQPReader $reader
1021
     * @param AMQPMessage $message
1022
     * @return AMQPMessage
1023
     */
1024 24
    protected function basic_get_ok($reader, $message)
1025
    {
1026 24
        $delivery_tag = $reader->read_longlong();
1027 24
        $redelivered = $reader->read_bit();
1028 24
        $exchange = $reader->read_shortstr();
1029 24
        $routing_key = $reader->read_shortstr();
1030 24
        $message_count = $reader->read_long();
1031
1032 24
        $message->delivery_info = array(
1033 24
            'delivery_tag' => $delivery_tag,
1034 24
            'redelivered' => $redelivered,
1035 24
            'exchange' => $exchange,
1036 24
            'routing_key' => $routing_key,
1037 8
            'message_count' => $message_count
1038 16
        );
1039
1040 24
        return $message;
1041
    }
1042
1043
    /**
1044
     * @param string $exchange
1045
     * @param string $routing_key
1046
     * @param $mandatory
1047
     * @param $immediate
1048
     * @param int $ticket
1049
     * @return mixed
1050
     */
1051 42
    private function pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1052
    {
1053 42
        $cache_key = sprintf(
1054 42
            '%s|%s|%s|%s|%s',
1055 28
            $exchange,
1056 28
            $routing_key,
1057 28
            $mandatory,
1058 28
            $immediate,
1059
            $ticket
1060 28
        );
1061 42
        if (false === isset($this->publish_cache[$cache_key])) {
1062 42
            $ticket = $this->getTicket($ticket);
1063 42
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1064 28
                $ticket,
1065 28
                $exchange,
1066 28
                $routing_key,
1067 28
                $mandatory,
1068
                $immediate
1069 28
            );
1070
1071 42
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1072 42
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1073 42
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1074
                reset($this->publish_cache);
1075
                $old_key = key($this->publish_cache);
1076
                unset($this->publish_cache[$old_key]);
1077
            }
1078 28
        }
1079
1080 42
        return $this->publish_cache[$cache_key];
1081
    }
1082
1083
    /**
1084
     * Publishes a message
1085
     *
1086
     * @param AMQPMessage $msg
1087
     * @param string $exchange
1088
     * @param string $routing_key
1089
     * @param bool $mandatory
1090
     * @param bool $immediate
1091
     * @param int $ticket
1092
     */
1093 42
    public function basic_publish(
1094
        $msg,
1095
        $exchange = '',
1096
        $routing_key = '',
1097
        $mandatory = false,
1098
        $immediate = false,
1099
        $ticket = null
1100
    ) {
1101 42
        $pkt = new AMQPWriter();
1102 42
        $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1103
1104 42
        $this->connection->send_content(
1105 42
            $this->channel_id,
1106 42
            60,
1107 42
            0,
1108 42
            mb_strlen($msg->body, 'ASCII'),
1109 42
            $msg->serialize_properties(),
1110 42
            $msg->body,
1111
            $pkt
1112 28
        );
1113
1114 42 View Code Duplication
        if ($this->next_delivery_tag > 0) {
1115
            $this->published_messages[$this->next_delivery_tag] = $msg;
1116
            $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
1117
        }
1118 42
    }
1119
1120
    /**
1121
     * @param AMQPMessage $msg
1122
     * @param string $exchange
1123
     * @param string $routing_key
1124
     * @param bool $mandatory
1125
     * @param bool $immediate
1126
     * @param int $ticket
1127
     */
1128
    public function batch_basic_publish(
1129
        $msg,
1130
        $exchange = '',
1131
        $routing_key = '',
1132
        $mandatory = false,
1133
        $immediate = false,
1134
        $ticket = null
1135
    ) {
1136
        $this->batch_messages[] = func_get_args();
1137
    }
1138
1139
    /**
1140
     * Publish batch
1141
     *
1142
     * @return void
1143
     */
1144
    public function publish_batch()
1145
    {
1146
        if (empty($this->batch_messages)) {
1147
            return null;
1148
        }
1149
1150
        /** @var AMQPWriter $pkt */
1151
        $pkt = new AMQPWriter();
1152
1153
        /** @var AMQPMessage $msg */
1154
        foreach ($this->batch_messages as $m) {
1155
            $msg = $m[0];
1156
1157
            $exchange = isset($m[1]) ? $m[1] : '';
1158
            $routing_key = isset($m[2]) ? $m[2] : '';
1159
            $mandatory = isset($m[3]) ? $m[3] : false;
1160
            $immediate = isset($m[4]) ? $m[4] : false;
1161
            $ticket = isset($m[5]) ? $m[5] : null;
1162
            $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1163
1164
            $this->connection->prepare_content(
1165
                $this->channel_id,
1166
                60,
1167
                0,
1168
                mb_strlen($msg->body, 'ASCII'),
1169
                $msg->serialize_properties(),
1170
                $msg->body,
1171
                $pkt
1172
            );
1173
1174 View Code Duplication
            if ($this->next_delivery_tag > 0) {
1175
                $this->published_messages[$this->next_delivery_tag] = $msg;
1176
                $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
1177
            }
1178
        }
1179
1180
        //call write here
1181
        $this->connection->write($pkt->getvalue());
1182
        $this->batch_messages = array();
1183
    }
1184
1185
    /**
1186
     * Specifies QoS
1187
     *
1188
     * @param int $prefetch_size
1189
     * @param int $prefetch_count
1190
     * @param bool $a_global
1191
     * @return mixed
1192
     */
1193
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
1194
    {
1195
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1196
            $prefetch_size,
1197
            $prefetch_count,
1198
            $a_global
1199
        );
1200
1201
        $this->send_method_frame(array($class_id, $method_id), $args);
1202
1203
        return $this->wait(array(
1204
            $this->waitHelper->get_wait('basic.qos_ok')
1205
        ));
1206
    }
1207
1208
    /**
1209
     * Confirms QoS request
1210
     * @param AMQPReader $reader
1211
     */
1212
    protected function basic_qos_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1213
    {
1214
    }
1215
1216
    /**
1217
     * Redelivers unacknowledged messages
1218
     *
1219
     * @param bool $requeue
1220
     * @return mixed
1221
     */
1222
    public function basic_recover($requeue = false)
1223
    {
1224
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1225
        $this->send_method_frame(array($class_id, $method_id), $args);
1226
1227
        return $this->wait(array(
1228
            $this->waitHelper->get_wait('basic.recover_ok')
1229
        ));
1230
    }
1231
1232
    /**
1233
     * Confirm the requested recover
1234
     * @param AMQPReader $reader
1235
     */
1236
    protected function basic_recover_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1237
    {
1238
    }
1239
1240
    /**
1241
     * Rejects an incoming message
1242
     *
1243
     * @param string $delivery_tag
1244
     * @param bool $requeue
1245
     */
1246
    public function basic_reject($delivery_tag, $requeue)
1247
    {
1248
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1249
        $this->send_method_frame(array($class_id, $method_id), $args);
1250
    }
1251
1252
    /**
1253
     * Returns a failed message
1254
     *
1255
     * @param AMQPReader $reader
1256
     * @param AMQPMessage $message
1257
     * @return null
1258
     */
1259
    protected function basic_return($reader, $message)
1260
    {
1261
        $callback = $this->basic_return_callback;
1262
        if (!is_callable($callback)) {
1263
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1264
            return null;
1265
        }
1266
1267
        $reply_code = $reader->read_short();
1268
        $reply_text = $reader->read_shortstr();
1269
        $exchange = $reader->read_shortstr();
1270
        $routing_key = $reader->read_shortstr();
1271
1272
        call_user_func_array($callback, array(
1273
            $reply_code,
1274
            $reply_text,
1275
            $exchange,
1276
            $routing_key,
1277
            $message,
1278
        ));
1279
    }
1280
1281
    /**
1282
     * @return mixed
1283
     */
1284
    public function tx_commit()
1285
    {
1286
        $this->send_method_frame(array(90, 20));
1287
1288
        return $this->wait(array(
1289
            $this->waitHelper->get_wait('tx.commit_ok')
1290
        ));
1291
    }
1292
1293
    /**
1294
     * Confirms a successful commit
1295
     * @param AMQPReader $reader
1296
     */
1297
    protected function tx_commit_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1298
    {
1299
    }
1300
1301
    /**
1302
     * Rollbacks the current transaction
1303
     *
1304
     * @return mixed
1305
     */
1306
    public function tx_rollback()
1307
    {
1308
        $this->send_method_frame(array(90, 30));
1309
1310
        return $this->wait(array(
1311
            $this->waitHelper->get_wait('tx.rollback_ok')
1312
        ));
1313
    }
1314
1315
    /**
1316
     * Confirms a successful rollback
1317
     *
1318
     * @param AMQPReader $reader
1319
     */
1320
    protected function tx_rollback_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1321
    {
1322
    }
1323
1324
    /**
1325
     * Puts the channel into confirm mode
1326
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1327
     *
1328
     * @param bool $nowait
1329
     * @return null
1330
     */
1331 View Code Duplication
    public function confirm_select($nowait = false)
1332
    {
1333
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
1334
1335
        $this->send_method_frame(array($class_id, $method_id), $args);
1336
1337
        if ($nowait) {
1338
            return null;
1339
        }
1340
1341
        $this->wait(array($this->waitHelper->get_wait('confirm.select_ok')));
1342
        $this->next_delivery_tag = 1;
1343
    }
1344
1345
    /**
1346
     * Confirms a selection
1347
     *
1348
     * @param AMQPReader $reader
1349
     */
1350
    public function confirm_select_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1351
    {
1352
    }
1353
1354
    /**
1355
     * Waits for pending acks and nacks from the server.
1356
     * If there are no pending acks, the method returns immediately
1357
     *
1358
     * @param int $timeout Waits until $timeout value is reached
1359
     */
1360 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
1361
    {
1362
        $functions = array(
1363
            $this->waitHelper->get_wait('basic.ack'),
1364
            $this->waitHelper->get_wait('basic.nack'),
1365
        );
1366
1367
        while (count($this->published_messages) !== 0) {
1368
            if ($timeout > 0) {
1369
                $this->wait($functions, true, $timeout);
1370
            } else {
1371
                $this->wait($functions);
1372
            }
1373
        }
1374
    }
1375
1376
    /**
1377
     * Waits for pending acks, nacks and returns from the server. If there are no pending acks, the method returns immediately.
1378
     *
1379
     * @param int $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1380
     */
1381 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
1382
    {
1383
        $functions = array(
1384
            $this->waitHelper->get_wait('basic.ack'),
1385
            $this->waitHelper->get_wait('basic.nack'),
1386
            $this->waitHelper->get_wait('basic.return'),
1387
        );
1388
1389
        while (count($this->published_messages) !== 0) {
1390
            if ($timeout > 0) {
1391
                $this->wait($functions, true, $timeout);
1392
            } else {
1393
                $this->wait($functions);
1394
            }
1395
        }
1396
    }
1397
1398
    /**
1399
     * Selects standard transaction mode
1400
     *
1401
     * @return mixed
1402
     */
1403
    public function tx_select()
1404
    {
1405
        $this->send_method_frame(array(90, 10));
1406
1407
        return $this->wait(array(
1408
            $this->waitHelper->get_wait('tx.select_ok')
1409
        ));
1410
    }
1411
1412
    /**
1413
     * Confirms transaction mode
1414
     * @param AMQPReader $reader
1415
     */
1416
    protected function tx_select_ok($reader)
0 ignored issues
show
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1417
    {
1418
    }
1419
1420
    /**
1421
     * @param array $arguments
1422
     * @return array
1423
     */
1424 48
    protected function getArguments($arguments)
1425
    {
1426 48
        return (null === $arguments) ? array() : $arguments;
1427
    }
1428
1429
    /**
1430
     * @param int $ticket
1431
     * @return int
1432
     */
1433 48
    protected function getTicket($ticket)
1434
    {
1435 48
        return (null === $ticket) ? $this->default_ticket : $ticket;
1436
    }
1437
1438
    /**
1439
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1440
     *
1441
     * @param int $index
1442
     * @return AMQPMessage
1443
     */
1444
    protected function get_and_unset_message($index)
1445
    {
1446
        $message = $this->published_messages[$index];
1447
        unset($this->published_messages[$index]);
1448
1449
        return $message;
1450
    }
1451
1452
    /**
1453
     * Sets callback for basic_return
1454
     *
1455
     * @param  callable $callback
1456
     * @throws \InvalidArgumentException if $callback is not callable
1457
     */
1458 View Code Duplication
    public function set_return_listener($callback)
1459
    {
1460
        if (!is_callable($callback)) {
1461
            throw new \InvalidArgumentException(sprintf(
1462
                'Given callback "%s" should be callable. %s type was given.',
1463
                $callback,
1464
                gettype($callback)
1465
            ));
1466
        }
1467
1468
        $this->basic_return_callback = $callback;
1469
    }
1470
1471
    /**
1472
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1473
     *
1474
     * @param callable $callback
1475
     * @throws \InvalidArgumentException
1476
     */
1477 View Code Duplication
    public function set_nack_handler($callback)
1478
    {
1479
        if (!is_callable($callback)) {
1480
            throw new \InvalidArgumentException(sprintf(
1481
                'Given callback "%s" should be callable. %s type was given.',
1482
                $callback,
1483
                gettype($callback)
1484
            ));
1485
        }
1486
1487
        $this->nack_handler = $callback;
1488
    }
1489
1490
    /**
1491
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1492
     *
1493
     * @param callable $callback
1494
     * @throws \InvalidArgumentException
1495
     */
1496 View Code Duplication
    public function set_ack_handler($callback)
1497
    {
1498
        if (!is_callable($callback)) {
1499
            throw new \InvalidArgumentException(sprintf(
1500
                'Given callback "%s" should be callable. %s type was given.',
1501
                $callback,
1502
                gettype($callback)
1503
            ));
1504
        }
1505
1506
        $this->ack_handler = $callback;
1507
    }
1508
}
1509