Completed
Push — master ( c3ed14...f41031 )
by
unknown
13s
created

AMQPChannel::internal_ack_handler()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3.8449

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 6
cts 11
cp 0.5455
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 9
nc 3
nop 3
crap 3.8449
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Exception\AMQPBasicCancelException;
5
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPReader;
9
use PhpAmqpLib\Wire\AMQPWriter;
10
11
class AMQPChannel extends AbstractChannel
12
{
13
    /** @var array */
14
    public $callbacks = array();
15
16
    /** @var bool Whether or not the channel has been "opened" */
17
    protected $is_open = false;
18
19
    /** @var int */
20
    protected $default_ticket;
21
22
    /** @var bool */
23
    protected $active;
24
25
    /** @var array */
26
    protected $alerts;
27
28
    /** @var bool */
29
    protected $auto_decode;
30
31
    /**
32
     * These parameters will be passed to function in case of basic_return:
33
     *    param int $reply_code
34
     *    param string $reply_text
35
     *    param string $exchange
36
     *    param string $routing_key
37
     *    param AMQPMessage $msg
38
     *
39
     * @var callable
40
     */
41
    protected $basic_return_callback;
42
43
    /** @var array Used to keep track of the messages that are going to be batch published. */
44
    protected $batch_messages = array();
45
46
    /**
47
     * If the channel is in confirm_publish mode this array will store all published messages
48
     * until they get ack'ed or nack'ed
49
     *
50
     * @var AMQPMessage[]
51
     */
52
    private $published_messages = array();
53
54
    /** @var int */
55
    private $next_delivery_tag = 0;
56
57
    /** @var callable */
58
    private $ack_handler;
59
60
    /** @var callable */
61
    private $nack_handler;
62
63
    /**
64
     * Circular buffer to speed up both basic_publish() and publish_batch().
65
     * Max size limited by $publish_cache_max_size.
66
     *
67
     * @var array
68
     * @see basic_publish()
69
     * @see publish_batch()
70
     */
71
    private $publish_cache;
72
73
    /**
74
     * Maximal size of $publish_cache
75
     *
76
     * @var int
77
     */
78
    private $publish_cache_max_size;
79
80
    /**
81
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
82
     * @param null $channel_id
83
     * @param bool $auto_decode
84
     * @throws \Exception
85
     */
86 84
    public function __construct($connection, $channel_id = null, $auto_decode = true)
87
    {
88 84
        if ($channel_id == null) {
89
            $channel_id = $connection->get_free_channel_id();
90
        }
91
92 84
        parent::__construct($connection, $channel_id);
93
94 84
        $this->publish_cache = array();
95 84
        $this->publish_cache_max_size = 100;
96
97 84
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
98
99 84
        $this->default_ticket = 0;
100 84
        $this->is_open = false;
101 84
        $this->active = true; // Flow control
102 84
        $this->alerts = array();
103 84
        $this->callbacks = array();
104 84
        $this->auto_decode = $auto_decode;
105
106
        try {
107 84
            $this->x_open();
108 56
        } catch (\Exception $e) {
109
            $this->close();
110
            throw $e;
111
        }
112 84
    }
113
114
    /**
115
     * Tear down this object, after we've agreed to close with the server.
116
     */
117 90
    protected function do_close()
118
    {
119 90
        if ($this->channel_id !== null) {
120 84
            unset($this->connection->channels[$this->channel_id]);
121 56
        }
122 90
        $this->channel_id = $this->connection = null;
123 90
        $this->is_open = false;
124 90
    }
125
126
    /**
127
     * Only for AMQP0.8.0
128
     * This method allows the server to send a non-fatal warning to
129
     * the client.  This is used for methods that are normally
130
     * asynchronous and thus do not have confirmations, and for which
131
     * the server may detect errors that need to be reported.  Fatal
132
     * errors are handled as channel or connection exceptions; non-
133
     * fatal errors are sent through this method.
134
     *
135
     * @param AMQPReader $reader
136
     */
137
    protected function channel_alert($reader)
138
    {
139
        $reply_code = $reader->read_short();
140
        $reply_text = $reader->read_shortstr();
141
        $details = $reader->read_table();
142
        array_push($this->alerts, array($reply_code, $reply_text, $details));
143
    }
144
145
    /**
146
     * Request a channel close
147
     *
148
     * @param int $reply_code
149
     * @param string $reply_text
150
     * @param array $method_sig
151
     * @return mixed
152
     */
153 90
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
154
    {
155 90
        if ($this->is_open === false || $this->connection === null) {
156 12
            $this->do_close();
157
158 12
            return null; // already closed
159
        }
160 84
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
161 56
            $reply_code,
162 56
            $reply_text,
163 84
            $method_sig[0],
164 84
            $method_sig[1]
165 56
        );
166
167 84
        $this->send_method_frame(array($class_id, $method_id), $args);
168
169 84
        return $this->wait(array(
170 84
            $this->waitHelper->get_wait('channel.close_ok')
171 56
        ));
172
    }
173
174
    /**
175
     * @param AMQPReader $reader
176
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
177
     */
178 12
    protected function channel_close($reader)
179
    {
180 12
        $reply_code = $reader->read_short();
181 12
        $reply_text = $reader->read_shortstr();
182 12
        $class_id = $reader->read_short();
183 12
        $method_id = $reader->read_short();
184
185 12
        $this->send_method_frame(array(20, 41));
186 12
        $this->do_close();
187
188 12
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
189
    }
190
191
    /**
192
     * Confirm a channel close
193
     * Alias of AMQPChannel::do_close()
194
     *
195
     * @param AMQPReader $reader
196
     */
197 84
    protected function channel_close_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
198
    {
199 84
        $this->do_close();
200 84
    }
201
202
    /**
203
     * Enables/disables flow from peer
204
     *
205
     * @param $active
206
     * @return mixed
207
     */
208
    public function flow($active)
209
    {
210
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
211
        $this->send_method_frame(array($class_id, $method_id), $args);
212
213
        return $this->wait(array(
214
            $this->waitHelper->get_wait('channel.flow_ok')
215
        ));
216
    }
217
218
    /**
219
     * @param AMQPReader $reader
220
     */
221
    protected function channel_flow($reader)
222
    {
223
        $this->active = $reader->read_bit();
224
        $this->x_flow_ok($this->active);
225
    }
226
227
    /**
228
     * @param bool $active
229
     */
230
    protected function x_flow_ok($active)
231
    {
232
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
233
        $this->send_method_frame(array($class_id, $method_id), $args);
234
    }
235
236
    /**
237
     * @param AMQPReader $reader
238
     * @return bool
239
     */
240
    protected function channel_flow_ok($reader)
241
    {
242
        return $reader->read_bit();
243
    }
244
245
    /**
246
     * @param string $out_of_band
247
     * @return mixed
248
     */
249 84 View Code Duplication
    protected function x_open($out_of_band = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
250
    {
251 84
        if ($this->is_open) {
252
            return null;
253
        }
254
255 84
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
256 84
        $this->send_method_frame(array($class_id, $method_id), $args);
257
258 84
        return $this->wait(array(
259 84
            $this->waitHelper->get_wait('channel.open_ok')
260 56
        ));
261
    }
262
263
    /**
264
     * @param AMQPReader $reader
265
     */
266 84
    protected function channel_open_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
267
    {
268 84
        $this->is_open = true;
269
270 84
        $this->debug->debug_msg('Channel open');
271 84
    }
272
273
    /**
274
     * Requests an access ticket
275
     *
276
     * @param string $realm
277
     * @param bool $exclusive
278
     * @param bool $passive
279
     * @param bool $active
280
     * @param bool $write
281
     * @param bool $read
282
     * @return mixed
283
     */
284
    public function access_request(
285
        $realm,
286
        $exclusive = false,
287
        $passive = false,
288
        $active = false,
289
        $write = false,
290
        $read = false
291
    ) {
292
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
293
            $realm,
294
            $exclusive,
295
            $passive,
296
            $active,
297
            $write,
298
            $read
299
        );
300
301
        $this->send_method_frame(array($class_id, $method_id), $args);
302
303
        return $this->wait(array(
304
            $this->waitHelper->get_wait('access.request_ok')
305
        ));
306
    }
307
308
    /**
309
     * Grants access to server resources
310
     *
311
     * @param AMQPReader $reader
312
     * @return string
313
     */
314
    protected function access_request_ok($reader)
315
    {
316
        $this->default_ticket = $reader->read_short();
317
318
        return $this->default_ticket;
319
    }
320
321
    /**
322
     * Declares exchange
323
     *
324
     * @param string $exchange
325
     * @param string $type
326
     * @param bool $passive
327
     * @param bool $durable
328
     * @param bool $auto_delete
329
     * @param bool $internal
330
     * @param bool $nowait
331
     * @param array $arguments
332
     * @param int $ticket
333
     * @return mixed|null
334
     */
335 78 View Code Duplication
    public function exchange_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
336
        $exchange,
337
        $type,
338
        $passive = false,
339
        $durable = false,
340
        $auto_delete = true,
341
        $internal = false,
342
        $nowait = false,
343
        $arguments = null,
344
        $ticket = null
345
    ) {
346 78
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
Bug introduced by
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
347 78
        $ticket = $this->getTicket($ticket);
348
349 78
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
350 52
            $ticket,
351 52
            $exchange,
352 52
            $type,
353 52
            $passive,
354 52
            $durable,
355 52
            $auto_delete,
356 52
            $internal,
357 52
            $nowait,
358 26
            $arguments
359 52
        );
360
361 78
        $this->send_method_frame(array($class_id, $method_id), $args);
362
363 78
        if ($nowait) {
364
            return null;
365
        }
366
367 78
        return $this->wait(array(
368 78
            $this->waitHelper->get_wait('exchange.declare_ok')
369 52
        ));
370
    }
371
372
    /**
373
     * Confirms an exchange declaration
374
     * @param AMQPReader $reader
375
     */
376 78
    protected function exchange_declare_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
377
    {
378 78
    }
379
380
    /**
381
     * Deletes an exchange
382
     *
383
     * @param string $exchange
384
     * @param bool $if_unused
385
     * @param bool $nowait
386
     * @param null $ticket
387
     * @return mixed|null
388
     */
389 78 View Code Duplication
    public function exchange_delete(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
390
        $exchange,
391
        $if_unused = false,
392
        $nowait = false,
393
        $ticket = null
394
    ) {
395 78
        $ticket = $this->getTicket($ticket);
396 78
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
397 52
            $ticket,
398 52
            $exchange,
399 52
            $if_unused,
400 26
            $nowait
401 52
        );
402
403 78
        $this->send_method_frame(array($class_id, $method_id), $args);
404
405 78
        if ($nowait) {
406
            return null;
407
        }
408
409 78
        return $this->wait(array(
410 78
            $this->waitHelper->get_wait('exchange.delete_ok')
411 52
        ));
412
    }
413
414
    /**
415
     * Confirms deletion of an exchange
416
     *
417
     * @param AMQPReader $reader
418
     */
419 78
    protected function exchange_delete_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
420
    {
421 78
    }
422
423
    /**
424
     * Binds dest exchange to source exchange
425
     *
426
     * @param string $destination
427
     * @param string $source
428
     * @param string $routing_key
429
     * @param bool $nowait
430
     * @param array $arguments
431
     * @param int $ticket
432
     * @return mixed|null
433
     */
434 View Code Duplication
    public function exchange_bind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
435
        $destination,
436
        $source,
437
        $routing_key = '',
438
        $nowait = false,
439
        $arguments = null,
440
        $ticket = null
441
    ) {
442
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
Bug introduced by
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
443
        $ticket = $this->getTicket($ticket);
444
445
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
446
            $ticket,
447
            $destination,
448
            $source,
449
            $routing_key,
450
            $nowait,
451
            $arguments
452
        );
453
454
        $this->send_method_frame(array($class_id, $method_id), $args);
455
456
        if ($nowait) {
457
            return null;
458
        }
459
460
        return $this->wait(array(
461
            $this->waitHelper->get_wait('exchange.bind_ok')
462
        ));
463
    }
464
465
    /**
466
     * Confirms bind successful
467
     * @param AMQPReader $reader
468
     */
469
    protected function exchange_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
470
    {
471
    }
472
473
    /**
474
     * Unbinds dest exchange from source exchange
475
     *
476
     * @param string $destination
477
     * @param string $source
478
     * @param string $routing_key
479
     * @param bool $nowait
480
     * @param array $arguments
481
     * @param int $ticket
482
     * @return mixed
483
     */
484 View Code Duplication
    public function exchange_unbind($destination, $source, $routing_key = '', $nowait = false, $arguments = null, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
485
    {
486
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
Bug introduced by
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
487
        $ticket = $this->getTicket($ticket);
488
489
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
490
            $ticket,
491
            $destination,
492
            $source,
493
            $routing_key,
494
            $nowait,
495
            $arguments
496
        );
497
498
        $this->send_method_frame(array($class_id, $method_id), $args);
499
500
        return $this->wait(array(
501
            $this->waitHelper->get_wait('exchange.unbind_ok')
502
        ));
503
    }
504
505
    /**
506
     * Confirms unbind successful
507
     *
508
     * @param AMQPReader $reader
509
     */
510
    protected function exchange_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
511
    {
512
    }
513
514
    /**
515
     * Binds queue to an exchange
516
     *
517
     * @param string $queue
518
     * @param string $exchange
519
     * @param string $routing_key
520
     * @param bool $nowait
521
     * @param array $arguments
522
     * @param int $ticket
523
     * @return mixed|null
524
     */
525 78 View Code Duplication
    public function queue_bind($queue, $exchange, $routing_key = '', $nowait = false, $arguments = null, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
526
    {
527 78
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
Bug introduced by
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
528 78
        $ticket = $this->getTicket($ticket);
529
530 78
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
531 52
            $ticket,
532 52
            $queue,
533 52
            $exchange,
534 52
            $routing_key,
535 52
            $nowait,
536 26
            $arguments
537 52
        );
538
539 78
        $this->send_method_frame(array($class_id, $method_id), $args);
540
541 78
        if ($nowait) {
542
            return null;
543
        }
544
545 78
        return $this->wait(array(
546 78
            $this->waitHelper->get_wait('queue.bind_ok')
547 52
        ));
548
    }
549
550
    /**
551
     * Confirms bind successful
552
     *
553
     * @param AMQPReader $reader
554
     */
555 78
    protected function queue_bind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
556
    {
557 78
    }
558
559
    /**
560
     * Unbind queue from an exchange
561
     *
562
     * @param string $queue
563
     * @param string $exchange
564
     * @param string $routing_key
565
     * @param array $arguments
566
     * @param int $ticket
567
     * @return mixed
568
     */
569 View Code Duplication
    public function queue_unbind($queue, $exchange, $routing_key = '', $arguments = null, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
570
    {
571
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
Bug introduced by
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
572
        $ticket = $this->getTicket($ticket);
573
574
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
575
            $ticket,
576
            $queue,
577
            $exchange,
578
            $routing_key,
579
            $arguments
580
        );
581
582
        $this->send_method_frame(array($class_id, $method_id), $args);
583
584
        return $this->wait(array(
585
            $this->waitHelper->get_wait('queue.unbind_ok')
586
        ));
587
    }
588
589
    /**
590
     * Confirms unbind successful
591
     * @param AMQPReader $reader
592
     */
593
    protected function queue_unbind_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
594
    {
595
    }
596
597
    /**
598
     * Declares queue, creates if needed
599
     *
600
     * @param string $queue
601
     * @param bool $passive
602
     * @param bool $durable
603
     * @param bool $exclusive
604
     * @param bool $auto_delete
605
     * @param bool $nowait
606
     * @param array $arguments
607
     * @param int $ticket
608
     * @return mixed|null
609
     */
610 84 View Code Duplication
    public function queue_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
611
        $queue = '',
612
        $passive = false,
613
        $durable = false,
614
        $exclusive = false,
615
        $auto_delete = true,
616
        $nowait = false,
617
        $arguments = null,
618
        $ticket = null
619
    ) {
620 84
        $arguments = $this->getArguments($arguments);
0 ignored issues
show
Bug introduced by
It seems like $arguments can also be of type null; however, PhpAmqpLib\Channel\AMQPChannel::getArguments() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
621 84
        $ticket = $this->getTicket($ticket);
622
623 84
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
624 56
            $ticket,
625 56
            $queue,
626 56
            $passive,
627 56
            $durable,
628 56
            $exclusive,
629 56
            $auto_delete,
630 56
            $nowait,
631 28
            $arguments
632 56
        );
633
634 84
        $this->send_method_frame(array($class_id, $method_id), $args);
635
636 84
        if ($nowait) {
637
            return null;
638
        }
639
640 84
        return $this->wait(array(
641 84
            $this->waitHelper->get_wait('queue.declare_ok')
642 56
        ));
643
    }
644
645
    /**
646
     * Confirms a queue definition
647
     *
648
     * @param AMQPReader $reader
649
     * @return string[]
650
     */
651 84
    protected function queue_declare_ok($reader)
652
    {
653 84
        $queue = $reader->read_shortstr();
654 84
        $message_count = $reader->read_long();
655 84
        $consumer_count = $reader->read_long();
656
657 84
        return array($queue, $message_count, $consumer_count);
658
    }
659
660
    /**
661
     * Deletes a queue
662
     *
663
     * @param string $queue
664
     * @param bool $if_unused
665
     * @param bool $if_empty
666
     * @param bool $nowait
667
     * @param int $ticket
668
     * @return mixed|null
669
     */
670 60
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
671
    {
672 60
        $ticket = $this->getTicket($ticket);
673
674 60
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
675 40
            $ticket,
676 40
            $queue,
677 40
            $if_unused,
678 40
            $if_empty,
679 20
            $nowait
680 40
        );
681
682 60
        $this->send_method_frame(array($class_id, $method_id), $args);
683
684 60
        if ($nowait) {
685
            return null;
686
        }
687
688 60
        return $this->wait(array(
689 60
            $this->waitHelper->get_wait('queue.delete_ok')
690 40
        ));
691
    }
692
693
    /**
694
     * Confirms deletion of a queue
695
     *
696
     * @param AMQPReader $reader
697
     * @return string
698
     */
699 60
    protected function queue_delete_ok($reader)
700
    {
701 60
        return $reader->read_long();
702
    }
703
704
    /**
705
     * Purges a queue
706
     *
707
     * @param string $queue
708
     * @param bool $nowait
709
     * @param int $ticket
710
     * @return mixed|null
711
     */
712 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
713
    {
714
        $ticket = $this->getTicket($ticket);
715
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
716
717
        $this->send_method_frame(array($class_id, $method_id), $args);
718
719
        if ($nowait) {
720
            return null;
721
        }
722
723
        return $this->wait(array(
724
            $this->waitHelper->get_wait('queue.purge_ok')
725
        ));
726
    }
727
728
    /**
729
     * Confirms a queue purge
730
     *
731
     * @param AMQPReader $reader
732
     * @return string
733
     */
734
    protected function queue_purge_ok($reader)
735
    {
736
        return $reader->read_long();
737
    }
738
739
    /**
740
     * Acknowledges one or more messages
741
     *
742
     * @param string $delivery_tag
743
     * @param bool $multiple
744
     */
745 12
    public function basic_ack($delivery_tag, $multiple = false)
746
    {
747 12
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
748 12
        $this->send_method_frame(array($class_id, $method_id), $args);
749 12
    }
750
751
    /**
752
     * Called when the server sends a basic.ack
753
     *
754
     * @param AMQPReader $reader
755
     * @throws AMQPRuntimeException
756
     */
757 6 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
758
    {
759 6
        $delivery_tag = $reader->read_longlong();
760 6
        $multiple = (bool) $reader->read_bit();
761
762 6
        if (!isset($this->published_messages[$delivery_tag])) {
763
            throw new AMQPRuntimeException(sprintf(
764
                'Server ack\'ed unknown delivery_tag "%s"',
765
                $delivery_tag
766
            ));
767
        }
768
769 6
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
770 6
    }
771
772
    /**
773
     * Called when the server sends a basic.nack
774
     *
775
     * @param AMQPReader $reader
776
     * @throws AMQPRuntimeException
777
     */
778 View Code Duplication
    protected function basic_nack_from_server($reader)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
779
    {
780
        $delivery_tag = $reader->read_longlong();
781
        $multiple = (bool) $reader->read_bit();
782
783
        if (!isset($this->published_messages[$delivery_tag])) {
784
            throw new AMQPRuntimeException(sprintf(
785
                'Server nack\'ed unknown delivery_tag "%s"',
786
                $delivery_tag
787
            ));
788
        }
789
790
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
791
    }
792
793
    /**
794
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
795
     *
796
     * @param string $delivery_tag
797
     * @param bool $multiple
798
     * @param callable $handler
799
     */
800 6
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
801
    {
802 6
        if ($multiple) {
803
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
804
805
            foreach ($keys as $key) {
806
                $this->internal_ack_handler($key, false, $handler);
807
            }
808
809
        } else {
810 6
            $message = $this->get_and_unset_message($delivery_tag);
811 6
            $message->delivery_info['delivery_tag'] = $delivery_tag;
812 6
            $this->dispatch_to_handler($handler, array($message));
813
        }
814 6
    }
815
816
    /**
817
     * @param AMQPMessage[] $messages
818
     * @param string $value
819
     * @return mixed
820
     */
821
    protected function get_keys_less_or_equal(array $messages, $value)
822
    {
823
        $keys = array_reduce(
824
            array_keys($messages),
825
826
            /**
827
             * @param string $key
828
             */
829
            function ($keys, $key) use ($value) {
830
                if (bccomp($key, $value, 0) <= 0) {
831
                    $keys[] = $key;
832
                }
833
834
                return $keys;
835
            },
836
            array()
837
        );
838
839
        return $keys;
840
    }
841
842
    /**
843
     * Rejects one or several received messages
844
     *
845
     * @param string $delivery_tag
846
     * @param bool $multiple
847
     * @param bool $requeue
848
     */
849
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
850
    {
851
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
852
        $this->send_method_frame(array($class_id, $method_id), $args);
853
    }
854
855
    /**
856
     * Ends a queue consumer
857
     *
858
     * @param string $consumer_tag
859
     * @param bool $nowait
860
     * @param bool $noreturn
861
     * @return mixed
862
     */
863 24 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
864
    {
865 24
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
866 24
        $this->send_method_frame(array($class_id, $method_id), $args);
867
868 24
        if ($nowait || $noreturn) {
869
            unset($this->callbacks[$consumer_tag]);
870
            return $consumer_tag;
871
        }
872
873 24
        return $this->wait(array(
874 24
            $this->waitHelper->get_wait('basic.cancel_ok')
875 16
        ));
876
    }
877
878
    /**
879
     * @param AMQPReader $reader
880
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
881
     */
882
    protected function basic_cancel_from_server(AMQPReader $reader)
883
    {
884
        throw new AMQPBasicCancelException($reader->read_shortstr());
885
    }
886
887
    /**
888
     * Confirm a cancelled consumer
889
     *
890
     * @param AMQPReader $reader
891
     * @return string
892
     */
893 24
    protected function basic_cancel_ok($reader)
894
    {
895 24
        $consumerTag = $reader->read_shortstr();
896 24
        unset($this->callbacks[$consumerTag]);
897
898 24
        return $consumerTag;
899
    }
900
901
    /**
902
     * Starts a queue consumer
903
     *
904
     * @param string $queue
905
     * @param string $consumer_tag
906
     * @param bool $no_local
907
     * @param bool $no_ack
908
     * @param bool $exclusive
909
     * @param bool $nowait
910
     * @param callback|null $callback
911
     * @param int|null $ticket
912
     * @param array $arguments
913
     * @return mixed|string
914
     */
915 24
    public function basic_consume(
916
        $queue = '',
917
        $consumer_tag = '',
918
        $no_local = false,
919
        $no_ack = false,
920
        $exclusive = false,
921
        $nowait = false,
922
        $callback = null,
923
        $ticket = null,
924
        $arguments = array()
925
    ) {
926 24
        $ticket = $this->getTicket($ticket);
927 24
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
928 16
            $ticket,
929 16
            $queue,
930 16
            $consumer_tag,
931 16
            $no_local,
932 16
            $no_ack,
933 16
            $exclusive,
934 16
            $nowait,
935 24
            $this->protocolVersion == '0.9.1' ? $arguments : null
936 16
        );
937
938 24
        $this->send_method_frame(array($class_id, $method_id), $args);
939
940 24
        if (false === $nowait) {
941 24
            $consumer_tag = $this->wait(array(
942 24
                $this->waitHelper->get_wait('basic.consume_ok')
943 16
            ));
944 16
        }
945
946 24
        $this->callbacks[$consumer_tag] = $callback;
947
948 24
        return $consumer_tag;
949
    }
950
951
    /**
952
     * Confirms a new consumer
953
     *
954
     * @param AMQPReader $reader
955
     * @return string
956
     */
957 24
    protected function basic_consume_ok($reader)
958
    {
959 24
        return $reader->read_shortstr();
960
    }
961
962
    /**
963
     * Notifies the client of a consumer message
964
     *
965
     * @param AMQPReader $reader
966
     * @param AMQPMessage $message
967
     */
968 24
    protected function basic_deliver($reader, $message)
969
    {
970 24
        $consumer_tag = $reader->read_shortstr();
971 24
        $delivery_tag = $reader->read_longlong();
972 24
        $redelivered = $reader->read_bit();
973 24
        $exchange = $reader->read_shortstr();
974 24
        $routing_key = $reader->read_shortstr();
975
976 24
        $message->delivery_info = array(
977 24
            'channel' => $this,
978 24
            'consumer_tag' => $consumer_tag,
979 24
            'delivery_tag' => $delivery_tag,
980 24
            'redelivered' => $redelivered,
981 24
            'exchange' => $exchange,
982 8
            'routing_key' => $routing_key
983 16
        );
984
985 24
        if (isset($this->callbacks[$consumer_tag])) {
986 24
            call_user_func($this->callbacks[$consumer_tag], $message);
987 12
        }
988 18
    }
989
990
    /**
991
     * Direct access to a queue if no message was available in the queue, return null
992
     *
993
     * @param string $queue
994
     * @param bool $no_ack
995
     * @param int $ticket
996
     * @return mixed
997
     */
998 42
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
999
    {
1000 42
        $ticket = $this->getTicket($ticket);
1001 42
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1002
1003 42
        $this->send_method_frame(array($class_id, $method_id), $args);
1004
1005 42
        return $this->wait(array(
1006 42
            $this->waitHelper->get_wait('basic.get_ok'),
1007 42
            $this->waitHelper->get_wait('basic.get_empty')
1008 28
        ));
1009
    }
1010
1011
    /**
1012
     * Indicates no messages available
1013
     *
1014
     * @param AMQPReader $reader
1015
     */
1016
    protected function basic_get_empty($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1017
    {
1018
    }
1019
1020
    /**
1021
     * Provides client with a message
1022
     *
1023
     * @param AMQPReader $reader
1024
     * @param AMQPMessage $message
1025
     * @return AMQPMessage
1026
     */
1027 42
    protected function basic_get_ok($reader, $message)
1028
    {
1029 42
        $delivery_tag = $reader->read_longlong();
1030 42
        $redelivered = $reader->read_bit();
1031 42
        $exchange = $reader->read_shortstr();
1032 42
        $routing_key = $reader->read_shortstr();
1033 42
        $message_count = $reader->read_long();
1034
1035 42
        $message->delivery_info = array(
1036 42
            'delivery_tag' => $delivery_tag,
1037 42
            'redelivered' => $redelivered,
1038 42
            'exchange' => $exchange,
1039 42
            'routing_key' => $routing_key,
1040 14
            'message_count' => $message_count
1041 28
        );
1042
1043 42
        return $message;
1044
    }
1045
1046
    /**
1047
     * @param string $exchange
1048
     * @param string $routing_key
1049
     * @param $mandatory
1050
     * @param $immediate
1051
     * @param int $ticket
1052
     * @return mixed
1053
     */
1054 72
    private function pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1055
    {
1056 72
        $cache_key = sprintf(
1057 72
            '%s|%s|%s|%s|%s',
1058 48
            $exchange,
1059 48
            $routing_key,
1060 48
            $mandatory,
1061 48
            $immediate,
1062 24
            $ticket
1063 48
        );
1064 72
        if (false === isset($this->publish_cache[$cache_key])) {
1065 72
            $ticket = $this->getTicket($ticket);
1066 72
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1067 48
                $ticket,
1068 48
                $exchange,
1069 48
                $routing_key,
1070 48
                $mandatory,
1071 24
                $immediate
1072 48
            );
1073
1074 72
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1075 72
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1076 72
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1077
                reset($this->publish_cache);
1078
                $old_key = key($this->publish_cache);
1079
                unset($this->publish_cache[$old_key]);
1080
            }
1081 48
        }
1082
1083 72
        return $this->publish_cache[$cache_key];
1084
    }
1085
1086
    /**
1087
     * Publishes a message
1088
     *
1089
     * @param AMQPMessage $msg
1090
     * @param string $exchange
1091
     * @param string $routing_key
1092
     * @param bool $mandatory
1093
     * @param bool $immediate
1094
     * @param int $ticket
1095
     */
1096 72
    public function basic_publish(
1097
        $msg,
1098
        $exchange = '',
1099
        $routing_key = '',
1100
        $mandatory = false,
1101
        $immediate = false,
1102
        $ticket = null
1103
    ) {
1104 72
        $pkt = new AMQPWriter();
1105 72
        $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1106
1107 72
        $this->connection->send_content(
1108 72
            $this->channel_id,
1109 72
            60,
1110 72
            0,
1111 72
            mb_strlen($msg->body, 'ASCII'),
1112 72
            $msg->serialize_properties(),
1113 72
            $msg->body,
1114 24
            $pkt
1115 48
        );
1116
1117 72 View Code Duplication
        if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1118 6
            $this->published_messages[$this->next_delivery_tag] = $msg;
1119 6
            $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1120 4
        }
1121 72
    }
1122
1123
    /**
1124
     * @param AMQPMessage $msg
1125
     * @param string $exchange
1126
     * @param string $routing_key
1127
     * @param bool $mandatory
1128
     * @param bool $immediate
1129
     * @param int $ticket
1130
     */
1131
    public function batch_basic_publish(
1132
        $msg,
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1133
        $exchange = '',
0 ignored issues
show
Unused Code introduced by
The parameter $exchange is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1134
        $routing_key = '',
0 ignored issues
show
Unused Code introduced by
The parameter $routing_key is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1135
        $mandatory = false,
0 ignored issues
show
Unused Code introduced by
The parameter $mandatory is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1136
        $immediate = false,
0 ignored issues
show
Unused Code introduced by
The parameter $immediate is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1137
        $ticket = null
0 ignored issues
show
Unused Code introduced by
The parameter $ticket is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1138
    ) {
1139
        $this->batch_messages[] = func_get_args();
1140
    }
1141
1142
    /**
1143
     * Publish batch
1144
     *
1145
     * @return void
1146
     */
1147
    public function publish_batch()
1148
    {
1149
        if (empty($this->batch_messages)) {
1150
            return null;
1151
        }
1152
1153
        /** @var AMQPWriter $pkt */
1154
        $pkt = new AMQPWriter();
1155
1156
        /** @var AMQPMessage $msg */
1157
        foreach ($this->batch_messages as $m) {
1158
            $msg = $m[0];
1159
1160
            $exchange = isset($m[1]) ? $m[1] : '';
1161
            $routing_key = isset($m[2]) ? $m[2] : '';
1162
            $mandatory = isset($m[3]) ? $m[3] : false;
1163
            $immediate = isset($m[4]) ? $m[4] : false;
1164
            $ticket = isset($m[5]) ? $m[5] : null;
1165
            $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1166
1167
            $this->connection->prepare_content(
1168
                $this->channel_id,
1169
                60,
1170
                0,
1171
                mb_strlen($msg->body, 'ASCII'),
1172
                $msg->serialize_properties(),
1173
                $msg->body,
1174
                $pkt
1175
            );
1176
1177 View Code Duplication
            if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1178
                $this->published_messages[$this->next_delivery_tag] = $msg;
1179
                $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1180
            }
1181
        }
1182
1183
        //call write here
1184
        $this->connection->write($pkt->getvalue());
1185
        $this->batch_messages = array();
1186
    }
1187
1188
    /**
1189
     * Specifies QoS
1190
     *
1191
     * @param int $prefetch_size
1192
     * @param int $prefetch_count
1193
     * @param bool $a_global
1194
     * @return mixed
1195
     */
1196
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
1197
    {
1198
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1199
            $prefetch_size,
1200
            $prefetch_count,
1201
            $a_global
1202
        );
1203
1204
        $this->send_method_frame(array($class_id, $method_id), $args);
1205
1206
        return $this->wait(array(
1207
            $this->waitHelper->get_wait('basic.qos_ok')
1208
        ));
1209
    }
1210
1211
    /**
1212
     * Confirms QoS request
1213
     * @param AMQPReader $reader
1214
     */
1215
    protected function basic_qos_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1216
    {
1217
    }
1218
1219
    /**
1220
     * Redelivers unacknowledged messages
1221
     *
1222
     * @param bool $requeue
1223
     * @return mixed
1224
     */
1225
    public function basic_recover($requeue = false)
1226
    {
1227
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1228
        $this->send_method_frame(array($class_id, $method_id), $args);
1229
1230
        return $this->wait(array(
1231
            $this->waitHelper->get_wait('basic.recover_ok')
1232
        ));
1233
    }
1234
1235
    /**
1236
     * Confirm the requested recover
1237
     * @param AMQPReader $reader
1238
     */
1239
    protected function basic_recover_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1240
    {
1241
    }
1242
1243
    /**
1244
     * Rejects an incoming message
1245
     *
1246
     * @param string $delivery_tag
1247
     * @param bool $requeue
1248
     */
1249
    public function basic_reject($delivery_tag, $requeue)
1250
    {
1251
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1252
        $this->send_method_frame(array($class_id, $method_id), $args);
1253
    }
1254
1255
    /**
1256
     * Returns a failed message
1257
     *
1258
     * @param AMQPReader $reader
1259
     * @param AMQPMessage $message
1260
     * @return null
1261
     */
1262
    protected function basic_return($reader, $message)
1263
    {
1264
        $callback = $this->basic_return_callback;
1265
        if (!is_callable($callback)) {
1266
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1267
            return null;
1268
        }
1269
1270
        $reply_code = $reader->read_short();
1271
        $reply_text = $reader->read_shortstr();
1272
        $exchange = $reader->read_shortstr();
1273
        $routing_key = $reader->read_shortstr();
1274
1275
        call_user_func_array($callback, array(
1276
            $reply_code,
1277
            $reply_text,
1278
            $exchange,
1279
            $routing_key,
1280
            $message,
1281
        ));
1282
    }
1283
1284
    /**
1285
     * @return mixed
1286
     */
1287
    public function tx_commit()
1288
    {
1289
        $this->send_method_frame(array(90, 20));
1290
1291
        return $this->wait(array(
1292
            $this->waitHelper->get_wait('tx.commit_ok')
1293
        ));
1294
    }
1295
1296
    /**
1297
     * Confirms a successful commit
1298
     * @param AMQPReader $reader
1299
     */
1300
    protected function tx_commit_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1301
    {
1302
    }
1303
1304
    /**
1305
     * Rollbacks the current transaction
1306
     *
1307
     * @return mixed
1308
     */
1309
    public function tx_rollback()
1310
    {
1311
        $this->send_method_frame(array(90, 30));
1312
1313
        return $this->wait(array(
1314
            $this->waitHelper->get_wait('tx.rollback_ok')
1315
        ));
1316
    }
1317
1318
    /**
1319
     * Confirms a successful rollback
1320
     *
1321
     * @param AMQPReader $reader
1322
     */
1323
    protected function tx_rollback_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1324
    {
1325
    }
1326
1327
    /**
1328
     * Puts the channel into confirm mode
1329
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1330
     *
1331
     * @param bool $nowait
1332
     * @return null
1333
     */
1334 6 View Code Duplication
    public function confirm_select($nowait = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1335
    {
1336 6
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
1337
1338 6
        $this->send_method_frame(array($class_id, $method_id), $args);
1339
1340 6
        if ($nowait) {
1341
            return null;
1342
        }
1343
1344 6
        $this->wait(array($this->waitHelper->get_wait('confirm.select_ok')));
1345 6
        $this->next_delivery_tag = 1;
1346 6
    }
1347
1348
    /**
1349
     * Confirms a selection
1350
     *
1351
     * @param AMQPReader $reader
1352
     */
1353 6
    public function confirm_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1354
    {
1355 6
    }
1356
1357
    /**
1358
     * Waits for pending acks and nacks from the server.
1359
     * If there are no pending acks, the method returns immediately
1360
     *
1361
     * @param int $timeout Waits until $timeout value is reached
1362
     */
1363 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1364
    {
1365
        $functions = array(
1366
            $this->waitHelper->get_wait('basic.ack'),
1367
            $this->waitHelper->get_wait('basic.nack'),
1368
        );
1369
1370
        while (count($this->published_messages) !== 0) {
1371
            if ($timeout > 0) {
1372
                $this->wait($functions, true, $timeout);
1373
            } else {
1374
                $this->wait($functions);
1375
            }
1376
        }
1377
    }
1378
1379
    /**
1380
     * Waits for pending acks, nacks and returns from the server. If there are no pending acks, the method returns immediately.
1381
     *
1382
     * @param int $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1383
     */
1384 6 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1385
    {
1386
        $functions = array(
1387 6
            $this->waitHelper->get_wait('basic.ack'),
1388 6
            $this->waitHelper->get_wait('basic.nack'),
1389 6
            $this->waitHelper->get_wait('basic.return'),
1390 4
        );
1391
1392 6
        while (count($this->published_messages) !== 0) {
1393 6
            if ($timeout > 0) {
1394 6
                $this->wait($functions, true, $timeout);
1395 4
            } else {
1396
                $this->wait($functions);
1397
            }
1398 4
        }
1399 6
    }
1400
1401
    /**
1402
     * Selects standard transaction mode
1403
     *
1404
     * @return mixed
1405
     */
1406
    public function tx_select()
1407
    {
1408
        $this->send_method_frame(array(90, 10));
1409
1410
        return $this->wait(array(
1411
            $this->waitHelper->get_wait('tx.select_ok')
1412
        ));
1413
    }
1414
1415
    /**
1416
     * Confirms transaction mode
1417
     * @param AMQPReader $reader
1418
     */
1419
    protected function tx_select_ok($reader)
0 ignored issues
show
Unused Code introduced by
The parameter $reader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1420
    {
1421
    }
1422
1423
    /**
1424
     * @param array $arguments
1425
     * @return array
1426
     */
1427 84
    protected function getArguments($arguments)
1428
    {
1429 84
        return (null === $arguments) ? array() : $arguments;
1430
    }
1431
1432
    /**
1433
     * @param int $ticket
1434
     * @return int
1435
     */
1436 84
    protected function getTicket($ticket)
1437
    {
1438 84
        return (null === $ticket) ? $this->default_ticket : $ticket;
1439
    }
1440
1441
    /**
1442
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1443
     *
1444
     * @param int $index
1445
     * @return AMQPMessage
1446
     */
1447 6
    protected function get_and_unset_message($index)
1448
    {
1449 6
        $message = $this->published_messages[$index];
1450 6
        unset($this->published_messages[$index]);
1451
1452 6
        return $message;
1453
    }
1454
1455
    /**
1456
     * Sets callback for basic_return
1457
     *
1458
     * @param  callable $callback
1459
     * @throws \InvalidArgumentException if $callback is not callable
1460
     */
1461 View Code Duplication
    public function set_return_listener($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1462
    {
1463
        if (!is_callable($callback)) {
1464
            throw new \InvalidArgumentException(sprintf(
1465
                'Given callback "%s" should be callable. %s type was given.',
1466
                $callback,
1467
                gettype($callback)
1468
            ));
1469
        }
1470
1471
        $this->basic_return_callback = $callback;
1472
    }
1473
1474
    /**
1475
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1476
     *
1477
     * @param callable $callback
1478
     * @throws \InvalidArgumentException
1479
     */
1480 View Code Duplication
    public function set_nack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1481
    {
1482
        if (!is_callable($callback)) {
1483
            throw new \InvalidArgumentException(sprintf(
1484
                'Given callback "%s" should be callable. %s type was given.',
1485
                $callback,
1486
                gettype($callback)
1487
            ));
1488
        }
1489
1490
        $this->nack_handler = $callback;
1491
    }
1492
1493
    /**
1494
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1495
     *
1496
     * @param callable $callback
1497
     * @throws \InvalidArgumentException
1498
     */
1499 6 View Code Duplication
    public function set_ack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1500
    {
1501 6
        if (!is_callable($callback)) {
1502
            throw new \InvalidArgumentException(sprintf(
1503
                'Given callback "%s" should be callable. %s type was given.',
1504
                $callback,
1505
                gettype($callback)
1506
            ));
1507
        }
1508
1509 6
        $this->ack_handler = $callback;
1510 6
    }
1511
}
1512