AMQPChannel::internal_ack_handler()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 2
Bugs 1 Features 1
Metric Value
c 2
b 1
f 1
dl 0
loc 14
ccs 0
cts 10
cp 0
rs 9.4285
cc 3
eloc 8
nc 3
nop 3
crap 12
1
<?php
2
namespace PhpAmqpLib\Channel;
3
4
use PhpAmqpLib\Exception\AMQPBasicCancelException;
5
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Helper\MiscHelper;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use PhpAmqpLib\Wire\AMQPReader;
10
use PhpAmqpLib\Wire\AMQPWriter;
11
12
class AMQPChannel extends AbstractChannel
13
{
14
    /** @var array */
15
    public $callbacks = array();
16
17
    /** @var bool Whether or not the channel has been "opened" */
18
    protected $is_open = false;
19
20
    /** @var int */
21
    protected $default_ticket;
22
23
    /** @var bool */
24
    protected $active;
25
26
    /** @var array */
27
    protected $alerts;
28
29
    /** @var bool */
30
    protected $auto_decode;
31
32
    /**
33
     * These parameters will be passed to function in case of basic_return:
34
     *    param int $reply_code
35
     *    param string $reply_text
36
     *    param string $exchange
37
     *    param string $routing_key
38
     *    param AMQPMessage $msg
39
     *
40
     * @var callable
41
     */
42
    protected $basic_return_callback;
43
44
    /** @var array Used to keep track of the messages that are going to be batch published. */
45
    protected $batch_messages = array();
46
47
    /**
48
     * If the channel is in confirm_publish mode this array will store all published messages
49
     * until they get ack'ed or nack'ed
50
     *
51
     * @var AMQPMessage[]
52
     */
53
    private $published_messages = array();
54
55
    /** @var int */
56
    private $next_delivery_tag = 0;
57
58
    /** @var callable */
59
    private $ack_handler;
60
61
    /** @var callable */
62
    private $nack_handler;
63
64
    /**
65
     * Circular buffer to speed up both basic_publish() and publish_batch().
66
     * Max size limited by $publish_cache_max_size.
67
     *
68
     * @var array
69
     * @see basic_publish()
70
     * @see publish_batch()
71
     */
72
    private $publish_cache;
73
74
    /**
75
     * Maximal size of $publish_cache
76
     *
77
     * @var int
78
     */
79
    private $publish_cache_max_size;
80
81
    /**
82
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
83
     * @param null $channel_id
84
     * @param bool $auto_decode
85
     * @throws \Exception
86
     */
87 60
    public function __construct($connection, $channel_id = null, $auto_decode = true)
88
    {
89 60
        if ($channel_id == null) {
90
            $channel_id = $connection->get_free_channel_id();
91
        }
92
93 60
        parent::__construct($connection, $channel_id);
94
95 60
        $this->publish_cache = array();
96 60
        $this->publish_cache_max_size = 100;
97
98 60
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
99
100 60
        $this->default_ticket = 0;
101 60
        $this->is_open = false;
102 60
        $this->active = true; // Flow control
103 60
        $this->alerts = array();
104 60
        $this->callbacks = array();
105 60
        $this->auto_decode = $auto_decode;
106
107
        try {
108 60
            $this->x_open();
109 48
        } catch (\Exception $e) {
110
            $this->close();
111
            throw $e;
112
        }
113 60
    }
114
115
    /**
116
     * Tear down this object, after we've agreed to close with the server.
117
     */
118 65
    protected function do_close()
119
    {
120 65
        if ($this->channel_id !== null) {
121 60
            unset($this->connection->channels[$this->channel_id]);
122 48
        }
123 65
        $this->channel_id = $this->connection = null;
124 65
        $this->is_open = false;
125 65
    }
126
127
    /**
128
     * Only for AMQP0.8.0
129
     * This method allows the server to send a non-fatal warning to
130
     * the client.  This is used for methods that are normally
131
     * asynchronous and thus do not have confirmations, and for which
132
     * the server may detect errors that need to be reported.  Fatal
133
     * errors are handled as channel or connection exceptions; non-
134
     * fatal errors are sent through this method.
135
     *
136
     * @param AMQPReader $args
137
     */
138
    protected function channel_alert($args)
139
    {
140
        $reply_code = $args->read_short();
141
        $reply_text = $args->read_shortstr();
142
        $details = $args->read_table();
143
        array_push($this->alerts, array($reply_code, $reply_text, $details));
144
    }
145
146
    /**
147
     * Request a channel close
148
     *
149
     * @param int $reply_code
150
     * @param string $reply_text
151
     * @param array $method_sig
152
     * @return mixed
153
     */
154 65
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
155
    {
156 65
        if ($this->is_open === false || $this->connection === null) {
157 10
            $this->do_close();
158
159 10
            return null; // already closed
160
        }
161 60
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
162 48
            $reply_code,
163 48
            $reply_text,
164 60
            $method_sig[0],
165 60
            $method_sig[1]
166 48
        );
167
168 60
        $this->send_method_frame(array($class_id, $method_id), $args);
169
170 60
        return $this->wait(array(
171 60
            $this->waitHelper->get_wait('channel.close_ok')
172 48
        ));
173
    }
174
175
    /**
176
     * @param AMQPReader $args
177
     * @throws \PhpAmqpLib\Exception\AMQPProtocolChannelException
178
     */
179 5
    protected function channel_close($args)
180
    {
181 5
        $reply_code = $args->read_short();
182 5
        $reply_text = $args->read_shortstr();
183 5
        $class_id = $args->read_short();
184 5
        $method_id = $args->read_short();
185
186 5
        $this->send_method_frame(array(20, 41));
187 5
        $this->do_close();
188
189 5
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
0 ignored issues
show
Documentation introduced by
array($class_id, $method_id) is of type array<integer,*,{"0":"*","1":"*"}>, but the function expects a object<Exception>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
190
    }
191
192
    /**
193
     * Confirm a channel close
194
     * Alias of AMQPChannel::do_close()
195
     *
196
     * @param AMQPReader $args
197
     */
198 60
    protected function channel_close_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
199
    {
200 60
        $this->do_close();
201 60
    }
202
203
    /**
204
     * Enables/disables flow from peer
205
     *
206
     * @param $active
207
     * @return mixed
208
     */
209
    public function flow($active)
210
    {
211
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
212
        $this->send_method_frame(array($class_id, $method_id), $args);
213
214
        return $this->wait(array(
215
            $this->waitHelper->get_wait('channel.flow_ok')
216
        ));
217
    }
218
219
    /**
220
     * @param AMQPReader $args
221
     */
222
    protected function channel_flow($args)
223
    {
224
        $this->active = $args->read_bit();
225
        $this->x_flow_ok($this->active);
226
    }
227
228
    /**
229
     * @param $active
230
     */
231
    protected function x_flow_ok($active)
232
    {
233
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
234
        $this->send_method_frame(array($class_id, $method_id), $args);
235
    }
236
237
    /**
238
     * @param AMQPReader $args
239
     * @return bool
240
     */
241
    protected function channel_flow_ok($args)
242
    {
243
        return $args->read_bit();
244
    }
245
246
    /**
247
     * @param string $out_of_band
248
     * @return mixed
249
     */
250 60 View Code Duplication
    protected function x_open($out_of_band = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
251
    {
252 60
        if ($this->is_open) {
253
            return null;
254
        }
255
256 60
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
257 60
        $this->send_method_frame(array($class_id, $method_id), $args);
258
259 60
        return $this->wait(array(
260 60
            $this->waitHelper->get_wait('channel.open_ok')
261 48
        ));
262
    }
263
264
    /**
265
     * @param AMQPReader $args
266
     */
267 60
    protected function channel_open_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
268
    {
269 60
        $this->is_open = true;
270
271 60
        $this->debug->debug_msg('Channel open');
272 60
    }
273
274
    /**
275
     * Requests an access ticket
276
     *
277
     * @param string $realm
278
     * @param bool $exclusive
279
     * @param bool $passive
280
     * @param bool $active
281
     * @param bool $write
282
     * @param bool $read
283
     * @return mixed
284
     */
285
    public function access_request(
286
        $realm,
287
        $exclusive = false,
288
        $passive = false,
289
        $active = false,
290
        $write = false,
291
        $read = false
292
    ) {
293
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
294
            $realm,
295
            $exclusive,
296
            $passive,
297
            $active,
298
            $write,
299
            $read
300
        );
301
302
        $this->send_method_frame(array($class_id, $method_id), $args);
303
304
        return $this->wait(array(
305
            $this->waitHelper->get_wait('access.request_ok')
306
        ));
307
    }
308
309
    /**
310
     * Grants access to server resources
311
     *
312
     * @param AMQPReader $args
313
     * @return string
314
     */
315
    protected function access_request_ok($args)
316
    {
317
        $this->default_ticket = $args->read_short();
318
319
        return $this->default_ticket;
320
    }
321
322
    /**
323
     * Declares exchange
324
     *
325
     * @param string $exchange
326
     * @param string $type
327
     * @param bool $passive
328
     * @param bool $durable
329
     * @param bool $auto_delete
330
     * @param bool $internal
331
     * @param bool $nowait
332
     * @return mixed|null
333
     */
334 55 View Code Duplication
    public function exchange_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
335
        $exchange,
336
        $type,
337
        $passive = false,
338
        $durable = false,
339
        $auto_delete = true,
340
        $internal = false,
341
        $nowait = false,
342
        $arguments = null,
343
        $ticket = null
344
    ) {
345 55
        $arguments = $this->getArguments($arguments);
346 55
        $ticket = $this->getTicket($ticket);
347
348 55
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
349 44
            $ticket,
350 44
            $exchange,
351 44
            $type,
352 44
            $passive,
353 44
            $durable,
354 44
            $auto_delete,
355 44
            $internal,
356 44
            $nowait,
357
            $arguments
358 44
        );
359
360 55
        $this->send_method_frame(array($class_id, $method_id), $args);
361
362 55
        if ($nowait) {
363
            return null;
364
        }
365
366 55
        return $this->wait(array(
367 55
            $this->waitHelper->get_wait('exchange.declare_ok')
368 44
        ));
369
    }
370
371
    /**
372
     * Confirms an exchange declaration
373
     */
374 55
    protected function exchange_declare_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
375
    {
376 55
    }
377
378
    /**
379
     * Deletes an exchange
380
     *
381
     * @param string $exchange
382
     * @param bool $if_unused
383
     * @param bool $nowait
384
     * @param null $ticket
385
     * @return mixed|null
386
     */
387 55 View Code Duplication
    public function exchange_delete(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
388
        $exchange,
389
        $if_unused = false,
390
        $nowait = false,
391
        $ticket = null
392
    ) {
393 55
        $ticket = $this->getTicket($ticket);
394 55
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
395 44
            $ticket,
396 44
            $exchange,
397 44
            $if_unused,
398
            $nowait
399 44
        );
400
401 55
        $this->send_method_frame(array($class_id, $method_id), $args);
402
403 55
        if ($nowait) {
404
            return null;
405
        }
406
407 55
        return $this->wait(array(
408 55
            $this->waitHelper->get_wait('exchange.delete_ok')
409 44
        ));
410
    }
411
412
    /**
413
     * Confirms deletion of an exchange
414
     */
415 55
    protected function exchange_delete_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
416
    {
417 55
    }
418
419
    /**
420
     * Binds dest exchange to source exchange
421
     *
422
     * @param string $destination
423
     * @param string $source
424
     * @param string $routing_key
425
     * @param bool $nowait
426
     * @param null $arguments
427
     * @param null $ticket
428
     * @return mixed|null
429
     */
430 View Code Duplication
    public function exchange_bind(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
431
        $destination,
432
        $source,
433
        $routing_key = '',
434
        $nowait = false,
435
        $arguments = null,
436
        $ticket = null
437
    ) {
438
        $arguments = $this->getArguments($arguments);
439
        $ticket = $this->getTicket($ticket);
440
441
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
0 ignored issues
show
Bug introduced by
The method exchangeBind does only exist in PhpAmqpLib\Helper\Protocol\Protocol091, but not in PhpAmqpLib\Helper\Protocol\Protocol080.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
442
            $ticket,
443
            $destination,
444
            $source,
445
            $routing_key,
446
            $nowait,
447
            $arguments
448
        );
449
450
        $this->send_method_frame(array($class_id, $method_id), $args);
451
452
        if ($nowait) {
453
            return null;
454
        }
455
456
        return $this->wait(array(
457
            $this->waitHelper->get_wait('exchange.bind_ok')
458
        ));
459
    }
460
461
    /**
462
     * Confirms bind successful
463
     */
464
    protected function exchange_bind_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
465
    {
466
    }
467
468
    /**
469
     * Unbinds dest exchange from source exchange
470
     *
471
     * @param string $destination
472
     * @param string $source
473
     * @param string $routing_key
474
     * @param null $arguments
475
     * @param null $ticket
476
     * @return mixed
477
     */
478 View Code Duplication
    public function exchange_unbind($destination, $source, $routing_key = '', $arguments = null, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
479
    {
480
        $arguments = $this->getArguments($arguments);
481
        $ticket = $this->getTicket($ticket);
482
483
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
0 ignored issues
show
Bug introduced by
The method exchangeUnbind does only exist in PhpAmqpLib\Helper\Protocol\Protocol091, but not in PhpAmqpLib\Helper\Protocol\Protocol080.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
484
            $ticket,
485
            $destination,
486
            $source,
487
            $routing_key,
488
            $arguments
489
        );
490
491
        $this->send_method_frame(array($class_id, $method_id), $args);
492
493
        return $this->wait(array(
494
            $this->waitHelper->get_wait('exchange.unbind_ok')
495
        ));
496
    }
497
498
    /**
499
     * Confirms unbind successful
500
     */
501
    protected function exchange_unbind_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
502
    {
503
    }
504
505
    /**
506
     * Binds queue to an exchange
507
     *
508
     * @param string $queue
509
     * @param string $exchange
510
     * @param string $routing_key
511
     * @param bool $nowait
512
     * @param null $arguments
513
     * @param null $ticket
514
     * @return mixed|null
515
     */
516 55 View Code Duplication
    public function queue_bind($queue, $exchange, $routing_key = '', $nowait = false, $arguments = null, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
517
    {
518 55
        $arguments = $this->getArguments($arguments);
519 55
        $ticket = $this->getTicket($ticket);
520
521 55
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
522 44
            $ticket,
523 44
            $queue,
524 44
            $exchange,
525 44
            $routing_key,
526 44
            $nowait,
527
            $arguments
528 44
        );
529
530 55
        $this->send_method_frame(array($class_id, $method_id), $args);
531
532 55
        if ($nowait) {
533
            return null;
534
        }
535
536 55
        return $this->wait(array(
537 55
            $this->waitHelper->get_wait('queue.bind_ok')
538 44
        ));
539
    }
540
541
    /**
542
     * Confirms bind successful
543
     */
544 55
    protected function queue_bind_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
545
    {
546 55
    }
547
548
    /**
549
     * Unbind queue from an exchange
550
     *
551
     * @param string $queue
552
     * @param string $exchange
553
     * @param string $routing_key
554
     * @param null $arguments
555
     * @param null $ticket
556
     * @return mixed
557
     */
558 View Code Duplication
    public function queue_unbind($queue, $exchange, $routing_key = '', $arguments = null, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
559
    {
560
        $arguments = $this->getArguments($arguments);
561
        $ticket = $this->getTicket($ticket);
562
563
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
564
            $ticket,
565
            $queue,
566
            $exchange,
567
            $routing_key,
568
            $arguments
569
        );
570
571
        $this->send_method_frame(array($class_id, $method_id), $args);
572
573
        return $this->wait(array(
574
            $this->waitHelper->get_wait('queue.unbind_ok')
575
        ));
576
    }
577
578
    /**
579
     * Confirms unbind successful
580
     */
581
    protected function queue_unbind_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
582
    {
583
    }
584
585
    /**
586
     * Declares queue, creates if needed
587
     *
588
     * @param string $queue
589
     * @param bool $passive
590
     * @param bool $durable
591
     * @param bool $exclusive
592
     * @param bool $auto_delete
593
     * @param bool $nowait
594
     * @param null $arguments
595
     * @param null $ticket
596
     * @return mixed|null
597
     */
598 60 View Code Duplication
    public function queue_declare(
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
599
        $queue = '',
600
        $passive = false,
601
        $durable = false,
602
        $exclusive = false,
603
        $auto_delete = true,
604
        $nowait = false,
605
        $arguments = null,
606
        $ticket = null
607
    ) {
608 60
        $arguments = $this->getArguments($arguments);
609 60
        $ticket = $this->getTicket($ticket);
610
611 60
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
612 48
            $ticket,
613 48
            $queue,
614 48
            $passive,
615 48
            $durable,
616 48
            $exclusive,
617 48
            $auto_delete,
618 48
            $nowait,
619
            $arguments
620 48
        );
621
622 60
        $this->send_method_frame(array($class_id, $method_id), $args);
623
624 60
        if ($nowait) {
625
            return null;
626
        }
627
628 60
        return $this->wait(array(
629 60
            $this->waitHelper->get_wait('queue.declare_ok')
630 48
        ));
631
    }
632
633
    /**
634
     * Confirms a queue definition
635
     *
636
     * @param AMQPReader $args
637
     * @return array
638
     */
639 60
    protected function queue_declare_ok($args)
640
    {
641 60
        $queue = $args->read_shortstr();
642 60
        $message_count = $args->read_long();
643 60
        $consumer_count = $args->read_long();
644
645 60
        return array($queue, $message_count, $consumer_count);
646
    }
647
648
    /**
649
     * Deletes a queue
650
     *
651
     * @param string $queue
652
     * @param bool $if_unused
653
     * @param bool $if_empty
654
     * @param bool $nowait
655
     * @param null $ticket
656
     * @return mixed|null
657
     */
658 20
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
659
    {
660 20
        $ticket = $this->getTicket($ticket);
661
662 20
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
663 16
            $ticket,
664 16
            $queue,
665 16
            $if_unused,
666 16
            $if_empty,
667
            $nowait
668 16
        );
669
670 20
        $this->send_method_frame(array($class_id, $method_id), $args);
671
672 20
        if ($nowait) {
673
            return null;
674
        }
675
676 20
        return $this->wait(array(
677 20
            $this->waitHelper->get_wait('queue.delete_ok')
678 16
        ));
679
    }
680
681
    /**
682
     * Confirms deletion of a queue
683
     *
684
     * @param AMQPReader $args
685
     * @return string
686
     */
687 20
    protected function queue_delete_ok($args)
688
    {
689 20
        return $args->read_long();
690
    }
691
692
    /**
693
     * Purges a queue
694
     *
695
     * @param string $queue
696
     * @param bool $nowait
697
     * @param null $ticket
698
     * @return mixed|null
699
     */
700 View Code Duplication
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
701
    {
702
        $ticket = $this->getTicket($ticket);
703
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
704
705
        $this->send_method_frame(array($class_id, $method_id), $args);
706
707
        if ($nowait) {
708
            return null;
709
        }
710
711
        return $this->wait(array(
712
            $this->waitHelper->get_wait('queue.purge_ok')
713
        ));
714
    }
715
716
    /**
717
     * Confirms a queue purge
718
     *
719
     * @param AMQPReader $args
720
     * @return string
721
     */
722
    protected function queue_purge_ok($args)
723
    {
724
        return $args->read_long();
725
    }
726
727
    /**
728
     * Acknowledges one or more messages
729
     *
730
     * @param string $delivery_tag
731
     * @param bool $multiple
732
     */
733 15
    public function basic_ack($delivery_tag, $multiple = false)
734
    {
735 15
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
736 15
        $this->send_method_frame(array($class_id, $method_id), $args);
737 15
    }
738
739
    /**
740
     * Called when the server sends a basic.ack
741
     *
742
     * @param AMQPReader $args
743
     * @throws AMQPRuntimeException
744
     */
745 View Code Duplication
    protected function basic_ack_from_server(AMQPReader $args)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
746
    {
747
        $delivery_tag = $args->read_longlong();
748
        $multiple = (bool) $args->read_bit();
749
750
        if (false === isset($this->published_messages[$delivery_tag])) {
751
            throw new AMQPRuntimeException(sprintf(
752
                'Server ack\'ed unknown delivery_tag "%s"',
753
                $delivery_tag
754
            ));
755
        }
756
757
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
758
    }
759
760
    /**
761
     * Called when the server sends a basic.nack
762
     *
763
     * @param AMQPReader $args
764
     * @throws AMQPRuntimeException
765
     */
766 View Code Duplication
    protected function basic_nack_from_server($args)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
767
    {
768
        $delivery_tag = $args->read_longlong();
769
        $multiple = (bool) $args->read_bit();
770
771
        if (false === isset($this->published_messages[$delivery_tag])) {
772
            throw new AMQPRuntimeException(sprintf(
773
                'Server nack\'ed unknown delivery_tag "%s"',
774
                $delivery_tag
775
            ));
776
        }
777
778
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
779
    }
780
781
    /**
782
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
783
     *
784
     * @param string $delivery_tag
785
     * @param bool $multiple
786
     * @param $handler
787
     */
788
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
789
    {
790
        if ($multiple) {
791
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
792
793
            foreach ($keys as $key) {
794
                $this->internal_ack_handler($key, false, $handler);
795
            }
796
797
        } else {
798
            $message = $this->get_and_unset_message($delivery_tag);
799
            $this->dispatch_to_handler($handler, array($message));
800
        }
801
    }
802
803
    /**
804
     * @param array $array
805
     * @param $value
806
     * @return mixed
807
     */
808
    protected function get_keys_less_or_equal(array $array, $value)
809
    {
810
        $keys = array_reduce(
811
            array_keys($array),
812
            function ($keys, $key) use ($value) {
813
                if (bccomp($key, $value, 0) <= 0) {
814
                    $keys[] = $key;
815
                }
816
817
                return $keys;
818
            },
819
            array()
820
        );
821
822
        return $keys;
823
    }
824
825
    /**
826
     * Rejects one or several received messages
827
     *
828
     * @param string $delivery_tag
829
     * @param bool $multiple
830
     * @param bool $requeue
831
     */
832
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
833
    {
834
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
0 ignored issues
show
Bug introduced by
The method basicNack does only exist in PhpAmqpLib\Helper\Protocol\Protocol091, but not in PhpAmqpLib\Helper\Protocol\Protocol080.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
835
        $this->send_method_frame(array($class_id, $method_id), $args);
836
    }
837
838
    /**
839
     * Ends a queue consumer
840
     *
841
     * @param string $consumer_tag
842
     * @param bool $nowait
843
     * @param bool $noreturn
844
     * @return mixed
845
     */
846 25 View Code Duplication
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
847
    {
848 25
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
849 25
        $this->send_method_frame(array($class_id, $method_id), $args);
850
851 25
        if ($nowait || $noreturn) {
852
            unset($this->callbacks[$consumer_tag]);
853
            return $consumer_tag;
854
        }
855
856 25
        return $this->wait(array(
857 25
            $this->waitHelper->get_wait('basic.cancel_ok')
858 20
        ));
859
    }
860
861
    /**
862
     * @param AMQPReader $args
863
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
864
     */
865
    protected function basic_cancel_from_server(AMQPReader $args)
866
    {
867
        throw new AMQPBasicCancelException($args->read_shortstr());
868
    }
869
870
    /**
871
     * Confirm a cancelled consumer
872
     *
873
     * @param AMQPReader $args
874
     * @return string
875
     */
876 25
    protected function basic_cancel_ok($args)
877
    {
878 25
        $consumer_tag = $args->read_shortstr();
879 25
        unset($this->callbacks[$consumer_tag]);
880
881 25
        return $consumer_tag;
882
    }
883
884
    /**
885
     * Starts a queue consumer
886
     *
887
     * @param string $queue
888
     * @param string $consumer_tag
889
     * @param bool $no_local
890
     * @param bool $no_ack
891
     * @param bool $exclusive
892
     * @param bool $nowait
893
     * @param callback|null $callback
894
     * @param int|null $ticket
895
     * @param array $arguments
896
     * @return mixed|string
897
     */
898 25
    public function basic_consume(
899
        $queue = '',
900
        $consumer_tag = '',
901
        $no_local = false,
902
        $no_ack = false,
903
        $exclusive = false,
904
        $nowait = false,
905
        $callback = null,
906
        $ticket = null,
907
        $arguments = array()
908
    ) {
909 25
        $ticket = $this->getTicket($ticket);
910 25
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
911 20
            $ticket,
912 20
            $queue,
913 20
            $consumer_tag,
914 20
            $no_local,
915 20
            $no_ack,
916 20
            $exclusive,
917 20
            $nowait,
918 25
            $this->protocolVersion == '0.9.1' ? $arguments : null
0 ignored issues
show
Unused Code introduced by
The call to Protocol080::basicConsume() has too many arguments starting with $this->protocolVersion =....1' ? $arguments : null.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
Bug introduced by
It seems like $this->protocolVersion =....1' ? $arguments : null can also be of type null; however, PhpAmqpLib\Helper\Protoc...ocol091::basicConsume() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
919 20
        );
920
921 25
        $this->send_method_frame(array($class_id, $method_id), $args);
922
923 25
        if (false === $nowait) {
924 25
            $consumer_tag = $this->wait(array(
925 25
                $this->waitHelper->get_wait('basic.consume_ok')
926 20
            ));
927 20
        }
928
929 25
        $this->callbacks[$consumer_tag] = $callback;
930
931 25
        return $consumer_tag;
932
    }
933
934
    /**
935
     * Confirms a new consumer
936
     *
937
     * @param AMQPReader $args
938
     * @return string
939
     */
940 25
    protected function basic_consume_ok($args)
941
    {
942 25
        return $args->read_shortstr();
943
    }
944
945
    /**
946
     * Notifies the client of a consumer message
947
     *
948
     * @param AMQPReader $args
949
     * @param AMQPMessage $msg
950
     */
951 25
    protected function basic_deliver($args, $msg)
952
    {
953 25
        $consumer_tag = $args->read_shortstr();
954 25
        $delivery_tag = $args->read_longlong();
955 25
        $redelivered = $args->read_bit();
956 25
        $exchange = $args->read_shortstr();
957 25
        $routing_key = $args->read_shortstr();
958
959 25
        $msg->delivery_info = array(
0 ignored issues
show
Documentation Bug introduced by
It seems like array('channel' => $this...g_key' => $routing_key) of type array<string,this<PhpAmq...routing_key":"string"}> is incompatible with the declared type array<integer,object<Php...b\Channel\AMQPChannel>> of property $delivery_info.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
960 25
            'channel' => $this,
961 25
            'consumer_tag' => $consumer_tag,
962 25
            'delivery_tag' => $delivery_tag,
963 25
            'redelivered' => $redelivered,
964 25
            'exchange' => $exchange,
965 5
            'routing_key' => $routing_key
966 20
        );
967
968 25
        if (isset($this->callbacks[$consumer_tag])) {
969 25
            call_user_func($this->callbacks[$consumer_tag], $msg);
970 12
        }
971 15
    }
972
973
    /**
974
     * Direct access to a queue if no message was available in the queue, return null
975
     *
976
     * @param string $queue
977
     * @param bool $no_ack
978
     * @param null $ticket
979
     * @return mixed
980
     */
981 20
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
982
    {
983 20
        $ticket = $this->getTicket($ticket);
984 20
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
985
986 20
        $this->send_method_frame(array($class_id, $method_id), $args);
987
988 20
        return $this->wait(array(
989 20
            $this->waitHelper->get_wait('basic.get_ok'),
990 20
            $this->waitHelper->get_wait('basic.get_empty')
991 16
        ));
992
    }
993
994
    /**
995
     * Indicates no messages available
996
     *
997
     * @param AMQPReader $args
998
     */
999
    protected function basic_get_empty($args)
1000
    {
1001
        $cluster_id = $args->read_shortstr();
0 ignored issues
show
Unused Code introduced by
$cluster_id is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
1002
    }
1003
1004
    /**
1005
     * Provides client with a message
1006
     *
1007
     * @param AMQPReader $args
1008
     * @param AMQPMessage $msg
1009
     * @return AMQPMessage
1010
     */
1011 20
    protected function basic_get_ok($args, $msg)
1012
    {
1013 20
        $delivery_tag = $args->read_longlong();
1014 20
        $redelivered = $args->read_bit();
1015 20
        $exchange = $args->read_shortstr();
1016 20
        $routing_key = $args->read_shortstr();
1017 20
        $message_count = $args->read_long();
1018
1019 20
        $msg->delivery_info = array(
0 ignored issues
show
Documentation Bug introduced by
It seems like array('delivery_tag' => ...unt' => $message_count) of type array<string,string|bool...ssage_count":"string"}> is incompatible with the declared type array<integer,object<Php...b\Channel\AMQPChannel>> of property $delivery_info.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
1020 20
            'delivery_tag' => $delivery_tag,
1021 20
            'redelivered' => $redelivered,
1022 20
            'exchange' => $exchange,
1023 20
            'routing_key' => $routing_key,
1024 4
            'message_count' => $message_count
1025 16
        );
1026
1027 20
        return $msg;
1028
    }
1029
1030
    /**
1031
     * @param string $exchange
1032
     * @param string $routing_key
1033
     * @param $mandatory
1034
     * @param $immediate
1035
     * @param $ticket
1036
     * @return mixed
1037
     */
1038 55
    private function pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1039
    {
1040 55
        $cache_key = sprintf(
1041 55
            '%s|%s|%s|%s|%s',
1042 44
            $exchange,
1043 44
            $routing_key,
1044 44
            $mandatory,
1045 44
            $immediate,
1046
            $ticket
1047 44
        );
1048 55
        if (false === isset($this->publish_cache[$cache_key])) {
1049 55
            $ticket = $this->getTicket($ticket);
1050 55
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1051 44
                $ticket,
1052 44
                $exchange,
1053 44
                $routing_key,
1054 44
                $mandatory,
1055
                $immediate
1056 44
            );
1057
1058 55
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1059 55
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1060 55
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1061
                reset($this->publish_cache);
1062
                $old_key = key($this->publish_cache);
1063
                unset($this->publish_cache[$old_key]);
1064
            }
1065 44
        }
1066
1067 55
        return $this->publish_cache[$cache_key];
1068
    }
1069
1070
    /**
1071
     * Publishes a message
1072
     *
1073
     * @param AMQPMessage $msg
1074
     * @param string $exchange
1075
     * @param string $routing_key
1076
     * @param bool $mandatory
1077
     * @param bool $immediate
1078
     * @param null $ticket
1079
     */
1080 55
    public function basic_publish(
1081
        $msg,
1082
        $exchange = '',
1083
        $routing_key = '',
1084
        $mandatory = false,
1085
        $immediate = false,
1086
        $ticket = null
1087
    ) {
1088 55
        $pkt = new AMQPWriter();
1089 55
        $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1090
1091 55
        $this->connection->send_content(
1092 55
            $this->channel_id,
1093 55
            60,
1094 55
            0,
1095 55
            mb_strlen($msg->body, 'ASCII'),
1096 55
            $msg->serialize_properties(),
1097 55
            $msg->body,
1098
            $pkt
1099 44
        );
1100
1101 55 View Code Duplication
        if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1102
            $this->published_messages[$this->next_delivery_tag] = $msg;
1103
            $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1104
        }
1105 55
    }
1106
1107
    /**
1108
     * @param AMQPMessage $msg
1109
     * @param string $exchange
1110
     * @param string $routing_key
1111
     * @param bool $mandatory
1112
     * @param bool $immediate
1113
     * @param null $ticket
1114
     */
1115
    public function batch_basic_publish(
1116
        $msg,
0 ignored issues
show
Unused Code introduced by
The parameter $msg is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1117
        $exchange = '',
0 ignored issues
show
Unused Code introduced by
The parameter $exchange is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1118
        $routing_key = '',
0 ignored issues
show
Unused Code introduced by
The parameter $routing_key is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1119
        $mandatory = false,
0 ignored issues
show
Unused Code introduced by
The parameter $mandatory is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1120
        $immediate = false,
0 ignored issues
show
Unused Code introduced by
The parameter $immediate is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1121
        $ticket = null
0 ignored issues
show
Unused Code introduced by
The parameter $ticket is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1122
    ) {
1123
        $this->batch_messages[] = func_get_args();
1124
    }
1125
1126
    /**
1127
     * Publish batch
1128
     *
1129
     * @return void
1130
     */
1131
    public function publish_batch()
1132
    {
1133
        if (empty($this->batch_messages)) {
1134
            return null;
1135
        }
1136
1137
        /** @var AMQPWriter $pkt */
1138
        $pkt = new AMQPWriter();
1139
1140
        /** @var AMQPMessage $msg */
1141
        foreach ($this->batch_messages as $m) {
1142
            $msg = $m[0];
1143
1144
            $exchange = isset($m[1]) ? $m[1] : '';
1145
            $routing_key = isset($m[2]) ? $m[2] : '';
1146
            $mandatory = isset($m[3]) ? $m[3] : false;
1147
            $immediate = isset($m[4]) ? $m[4] : false;
1148
            $ticket = isset($m[5]) ? $m[5] : null;
1149
            $pkt->write($this->pre_publish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1150
1151
            $this->connection->prepare_content(
1152
                $this->channel_id,
1153
                60,
1154
                0,
1155
                mb_strlen($msg->body, 'ASCII'),
1156
                $msg->serialize_properties(),
1157
                $msg->body,
1158
                $pkt
1159
            );
1160
1161 View Code Duplication
            if ($this->next_delivery_tag > 0) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1162
                $this->published_messages[$this->next_delivery_tag] = $msg;
1163
                $this->next_delivery_tag = bcadd($this->next_delivery_tag, '1', 0);
0 ignored issues
show
Documentation Bug introduced by
The property $next_delivery_tag was declared of type integer, but bcadd($this->next_delivery_tag, '1', 0) is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
1164
            }
1165
        }
1166
1167
        //call write here
1168
        $this->connection->write($pkt->getvalue());
1169
        $this->batch_messages = array();
1170
    }
1171
1172
    /**
1173
     * Specifies QoS
1174
     *
1175
     * @param int $prefetch_size
1176
     * @param int $prefetch_count
1177
     * @param bool $a_global
1178
     * @return mixed
1179
     */
1180
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
1181
    {
1182
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1183
            $prefetch_size,
1184
            $prefetch_count,
1185
            $a_global
1186
        );
1187
1188
        $this->send_method_frame(array($class_id, $method_id), $args);
1189
1190
        return $this->wait(array(
1191
            $this->waitHelper->get_wait('basic.qos_ok')
1192
        ));
1193
    }
1194
1195
    /**
1196
     * Confirms QoS request
1197
     */
1198
    protected function basic_qos_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1199
    {
1200
    }
1201
1202
    /**
1203
     * Redelivers unacknowledged messages
1204
     *
1205
     * @param bool $requeue
1206
     * @return mixed
1207
     */
1208
    public function basic_recover($requeue = false)
1209
    {
1210
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1211
        $this->send_method_frame(array($class_id, $method_id), $args);
1212
1213
        return $this->wait(array(
1214
            $this->waitHelper->get_wait('basic.recover_ok')
1215
        ));
1216
    }
1217
1218
    /**
1219
     * Confirm the requested recover
1220
     */
1221
    protected function basic_recover_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1222
    {
1223
    }
1224
1225
    /**
1226
     * Rejects an incoming message
1227
     *
1228
     * @param string $delivery_tag
1229
     * @param bool $requeue
1230
     */
1231
    public function basic_reject($delivery_tag, $requeue)
1232
    {
1233
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1234
        $this->send_method_frame(array($class_id, $method_id), $args);
1235
    }
1236
1237
    /**
1238
     * Returns a failed message
1239
     *
1240
     * @param AMQPReader $args
1241
     * @param AMQPMessage $msg
1242
     * @return null
1243
     */
1244
    protected function basic_return($args, $msg)
1245
    {
1246
        $reply_code = $args->read_short();
1247
        $reply_text = $args->read_shortstr();
1248
        $exchange = $args->read_shortstr();
1249
        $routing_key = $args->read_shortstr();
1250
        $callback = $this->basic_return_callback;
1251
1252
        if (!is_callable($callback)) {
1253
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1254
            return null;
1255
        }
1256
1257
        call_user_func_array($callback, array(
1258
            $reply_code,
1259
            $reply_text,
1260
            $exchange,
1261
            $routing_key,
1262
            $msg,
1263
        ));
1264
    }
1265
1266
    /**
1267
     * @return mixed
1268
     */
1269
    public function tx_commit()
1270
    {
1271
        $this->send_method_frame(array(90, 20));
1272
1273
        return $this->wait(array(
1274
            $this->waitHelper->get_wait('tx.commit_ok')
1275
        ));
1276
    }
1277
1278
    /**
1279
     * Confirms a successful commit
1280
     */
1281
    protected function tx_commit_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1282
    {
1283
    }
1284
1285
    /**
1286
     * Rollbacks the current transaction
1287
     *
1288
     * @return mixed
1289
     */
1290
    public function tx_rollback()
1291
    {
1292
        $this->send_method_frame(array(90, 30));
1293
1294
        return $this->wait(array(
1295
            $this->waitHelper->get_wait('tx.rollback_ok')
1296
        ));
1297
    }
1298
1299
    /**
1300
     * Confirms a successful rollback
1301
     */
1302
    protected function tx_rollback_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1303
    {
1304
    }
1305
1306
    /**
1307
     * Puts the channel into confirm mode
1308
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1309
     *
1310
     * @param bool $nowait
1311
     */
1312 View Code Duplication
    public function confirm_select($nowait = false)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1313
    {
1314
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
0 ignored issues
show
Bug introduced by
The method confirmSelect does only exist in PhpAmqpLib\Helper\Protocol\Protocol091, but not in PhpAmqpLib\Helper\Protocol\Protocol080.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
1315
1316
        $this->send_method_frame(array($class_id, $method_id), $args);
1317
1318
        if ($nowait) {
1319
            return null;
1320
        }
1321
1322
        $this->wait(array($this->waitHelper->get_wait('confirm.select_ok')));
1323
        $this->next_delivery_tag = 1;
1324
    }
1325
1326
    /**
1327
     * Confirms a selection
1328
     *
1329
     * @return void
1330
     */
1331
    public function confirm_select_ok()
1332
    {
1333
    }
1334
1335
    /**
1336
     * Waits for pending acks and nacks from the server.
1337
     * If there are no pending acks, the method returns immediately
1338
     *
1339
     * @param int $timeout Waits until $timeout value is reached
1340
     */
1341 View Code Duplication
    public function wait_for_pending_acks($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1342
    {
1343
        $functions = array(
1344
            $this->waitHelper->get_wait('basic.ack'),
1345
            $this->waitHelper->get_wait('basic.nack'),
1346
        );
1347
1348
        while (count($this->published_messages) !== 0) {
1349
            if ($timeout > 0) {
1350
                $this->wait($functions, true, $timeout);
1351
            } else {
1352
                $this->wait($functions);
1353
            }
1354
        }
1355
    }
1356
1357
    /**
1358
     * Waits for pending acks, nacks and returns from the server. If there are no pending acks, the method returns immediately.
1359
     *
1360
     * @param int $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1361
     */
1362 View Code Duplication
    public function wait_for_pending_acks_returns($timeout = 0)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1363
    {
1364
        $functions = array(
1365
            $this->waitHelper->get_wait('basic.ack'),
1366
            $this->waitHelper->get_wait('basic.nack'),
1367
            $this->waitHelper->get_wait('basic.return'),
1368
        );
1369
1370
        while (count($this->published_messages) !== 0) {
1371
            if ($timeout > 0) {
1372
                $this->wait($functions, true, $timeout);
1373
            } else {
1374
                $this->wait($functions);
1375
            }
1376
        }
1377
    }
1378
1379
    /**
1380
     * Selects standard transaction mode
1381
     *
1382
     * @return mixed
1383
     */
1384
    public function tx_select()
1385
    {
1386
        $this->send_method_frame(array(90, 10));
1387
1388
        return $this->wait(array(
1389
            $this->waitHelper->get_wait('tx.select_ok')
1390
        ));
1391
    }
1392
1393
    /**
1394
     * Confirms transaction mode
1395
     */
1396
    protected function tx_select_ok($args)
0 ignored issues
show
Unused Code introduced by
The parameter $args is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
1397
    {
1398
    }
1399
1400
    /**
1401
     * @param mixed $arguments
1402
     * @return array|mixed
1403
     */
1404 60
    protected function getArguments($arguments)
1405
    {
1406 60
        return (null === $arguments) ? array() : $arguments;
1407
    }
1408
1409
    /**
1410
     * @param mixed $ticket
1411
     * @return int
1412
     */
1413 60
    protected function getTicket($ticket)
1414
    {
1415 60
        return (null === $ticket) ? $this->default_ticket : $ticket;
1416
    }
1417
1418
    /**
1419
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1420
     *
1421
     * @param $index
1422
     * @return AMQPMessage
1423
     */
1424
    protected function get_and_unset_message($index)
1425
    {
1426
        $message = $this->published_messages[$index];
1427
        unset($this->published_messages[$index]);
1428
1429
        return $message;
1430
    }
1431
1432
    /**
1433
     * Sets callback for basic_return
1434
     *
1435
     * @param  callable $callback
1436
     * @throws \InvalidArgumentException if $callback is not callable
1437
     */
1438 View Code Duplication
    public function set_return_listener($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1439
    {
1440
        if (!is_callable($callback)) {
1441
            throw new \InvalidArgumentException(sprintf(
1442
                'Given callback "%s" should be callable. %s type was given.',
1443
                $callback,
1444
                gettype($callback)
1445
            ));
1446
        }
1447
1448
        $this->basic_return_callback = $callback;
1449
    }
1450
1451
    /**
1452
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1453
     *
1454
     * @param callable $callback
1455
     * @throws \InvalidArgumentException
1456
     */
1457 View Code Duplication
    public function set_nack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1458
    {
1459
        if (!is_callable($callback)) {
1460
            throw new \InvalidArgumentException(sprintf(
1461
                'Given callback "%s" should be callable. %s type was given.',
1462
                $callback,
1463
                gettype($callback)
1464
            ));
1465
        }
1466
1467
        $this->nack_handler = $callback;
1468
    }
1469
1470
    /**
1471
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1472
     *
1473
     * @param callable $callback
1474
     * @throws \InvalidArgumentException
1475
     */
1476 View Code Duplication
    public function set_ack_handler($callback)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
1477
    {
1478
        if (!is_callable($callback)) {
1479
            throw new \InvalidArgumentException(sprintf(
1480
                'Given callback "%s" should be callable. %s type was given.',
1481
                $callback,
1482
                gettype($callback)
1483
            ));
1484
        }
1485
1486
        $this->ack_handler = $callback;
1487
    }
1488
}
1489