AMQPChannel   F
last analyzed

Complexity

Total Complexity 133

Size/Duplication

Total Lines 1603
Duplicated Lines 0 %

Test Coverage

Coverage 52.83%

Importance

Changes 5
Bugs 1 Features 1
Metric Value
eloc 485
dl 0
loc 1603
ccs 224
cts 424
cp 0.5283
rs 2
c 5
b 1
f 1
wmc 133

77 Methods

Rating   Name   Duplication   Size   Complexity  
A do_close() 0 8 2
A channel_alert() 0 6 1
A channel_close_ok() 0 3 1
A __construct() 0 18 3
A is_open() 0 3 1
A close() 0 26 4
A flow() 0 8 1
A channel_close() 0 11 1
A queue_unbind() 0 22 1
A queue_delete() 0 21 2
A exchange_unbind_ok() 0 2 1
A exchange_delete_ok() 0 2 1
A channel_open_ok() 0 5 1
A prePublish() 0 30 3
A basic_get_ok() 0 14 1
A x_flow_ok() 0 4 1
A is_consuming() 0 3 1
A basic_cancel() 0 13 3
A basic_nack() 0 4 1
A queue_declare_ok() 0 7 1
A batch_basic_publish() 0 15 1
A queue_purge() 0 14 2
A get_keys_less_or_equal() 0 19 2
A exchange_declare_ok() 0 2 1
A queue_bind() 0 28 2
A queue_delete_ok() 0 3 1
A basic_nack_from_server() 0 13 2
A basic_get_empty() 0 2 1
A basic_get() 0 11 1
A channel_flow() 0 4 1
A access_request_ok() 0 5 1
A basic_deliver() 0 15 2
B publish_batch() 0 40 9
B basic_consume() 0 44 8
A exchange_delete() 0 23 2
A channel_flow_ok() 0 3 1
A queue_purge_ok() 0 3 1
A exchange_bind() 0 28 2
A basic_cancel_ok() 0 6 1
A access_request() 0 22 1
A basic_cancel_from_server() 0 3 1
A queue_bind_ok() 0 2 1
A exchange_bind_ok() 0 2 1
A basic_publish() 0 31 3
A x_open() 0 12 2
A queue_declare() 0 32 2
A basic_ack() 0 4 1
A queue_unbind_ok() 0 2 1
A exchange_declare() 0 34 2
A internal_ack_handler() 0 11 3
A basic_consume_ok() 0 3 1
A basic_ack_from_server() 0 13 2
A exchange_unbind() 0 24 1
A wait_for_pending_acks_returns() 0 11 2
A set_ack_handler() 0 4 1
B consume() 0 28 8
A tx_select() 0 7 1
A getTicket() 0 3 2
A basic_recover_ok() 0 2 1
A tx_rollback() 0 7 1
A basic_qos_ok() 0 2 1
A checkConnection() 0 7 4
A basic_qos() 0 13 1
A tx_commit_ok() 0 2 1
A get_and_unset_message() 0 6 1
A basic_recover() 0 8 1
A set_return_listener() 0 4 1
A tx_select_ok() 0 2 1
A stopConsume() 0 3 1
A tx_rollback_ok() 0 2 1
A basic_return() 0 19 2
A set_nack_handler() 0 4 1
A wait_for_pending_acks() 0 9 2
A basic_reject() 0 4 1
A confirm_select() 0 14 2
A confirm_select_ok() 0 2 1
A tx_commit() 0 7 1

How to fix   Complexity   

Complex Class

Complex classes like AMQPChannel often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AMQPChannel, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace PhpAmqpLib\Channel;
4
5
use PhpAmqpLib\Connection\AbstractConnection;
6
use PhpAmqpLib\Exception\AMQPBasicCancelException;
7
use PhpAmqpLib\Exception\AMQPChannelClosedException;
8
use PhpAmqpLib\Exception\AMQPConnectionBlockedException;
9
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
10
use PhpAmqpLib\Exception\AMQPNoDataException;
11
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
12
use PhpAmqpLib\Exception\AMQPRuntimeException;
13
use PhpAmqpLib\Exception\AMQPTimeoutException;
14
use PhpAmqpLib\Helper\Assert;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use PhpAmqpLib\Wire;
17
use PhpAmqpLib\Wire\AMQPReader;
18
use PhpAmqpLib\Wire\AMQPTable;
19
use PhpAmqpLib\Wire\AMQPWriter;
20
21
class AMQPChannel extends AbstractChannel
22
{
23
    /**
24
     * @var callable[]
25
     * @internal Use is_consuming() to check if there is active callbacks
26
     */
27
    public $callbacks = array();
28
29
    /** @var bool Whether or not the channel has been "opened" */
30
    protected $is_open = false;
31
32
    /** @var int */
33
    protected $default_ticket = 0;
34
35
    /** @var bool */
36
    protected $active = true;
37
38
    /** @var bool */
39
    protected $stopConsume = false;
40
41
    /** @var array */
42
    protected $alerts = array();
43
44
    /** @var bool */
45
    protected $auto_decode;
46
47
    /**
48
     * These parameters will be passed to function in case of basic_return:
49
     *    param int $reply_code
50
     *    param string $reply_text
51
     *    param string $exchange
52
     *    param string $routing_key
53
     *    param AMQPMessage $msg
54
     *
55
     * @var null|callable
56
     */
57
    protected $basic_return_callback;
58
59
    /** @var array Used to keep track of the messages that are going to be batch published. */
60
    protected $batch_messages = array();
61
62
    /**
63
     * If the channel is in confirm_publish mode this array will store all published messages
64
     * until they get ack'ed or nack'ed
65
     *
66
     * @var AMQPMessage[]
67
     */
68
    private $published_messages = array();
69
70
    /** @var int */
71
    private $next_delivery_tag = 0;
72
73
    /** @var null|callable */
74
    private $ack_handler;
75
76
    /** @var null|callable */
77
    private $nack_handler;
78
79
    /**
80
     * Circular buffer to speed up both basic_publish() and publish_batch().
81
     * Max size limited by $publish_cache_max_size.
82
     *
83
     * @var array
84
     * @see basic_publish()
85
     * @see publish_batch()
86
     */
87
    private $publish_cache = array();
88
89
    /**
90
     * Maximal size of $publish_cache
91
     *
92
     * @var int
93
     */
94
    private $publish_cache_max_size = 100;
95
96
    /**
97
     * Maximum time to wait for operations on this channel, in seconds.
98
     * @var float
99
     */
100
    protected $channel_rpc_timeout;
101
102
    /**
103
     * @param AbstractConnection $connection
104
     * @param int|null $channel_id
105
     * @param bool $auto_decode
106
     * @param int|float $channel_rpc_timeout
107
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
108
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
109
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
110
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
111
     */
112 36
    public function __construct($connection, $channel_id = null, $auto_decode = true, $channel_rpc_timeout = 0)
113
    {
114 36
        if ($channel_id == null) {
0 ignored issues
show
Bug Best Practice introduced by
It seems like you are loosely comparing $channel_id of type integer|null against null; this is ambiguous if the integer can be zero. Consider using a strict comparison === instead.
Loading history...
115
            $channel_id = $connection->get_free_channel_id();
116
        }
117
118 36
        parent::__construct($connection, $channel_id);
119
120 36
        $this->debug->debug_msg('using channel_id: ' . $channel_id);
121
122 36
        $this->auto_decode = $auto_decode;
123 36
        $this->channel_rpc_timeout = $channel_rpc_timeout;
124
125
        try {
126 36
            $this->x_open();
127
        } catch (\Exception $e) {
128
            $this->close();
129
            throw $e;
130
        }
131
    }
132
133
    /**
134
     * @return bool
135
     */
136 4
    public function is_open()
137
    {
138 4
        return $this->is_open;
139
    }
140
141
    /**
142
     * Tear down this object, after we've agreed to close with the server.
143
     */
144 31
    protected function do_close()
145
    {
146 31
        if ($this->channel_id !== null) {
147 31
            unset($this->connection->channels[$this->channel_id]);
148
        }
149 31
        $this->channel_id = $this->connection = null;
150 31
        $this->is_open = false;
151 31
        $this->callbacks = array();
152
    }
153
154
    /**
155
     * Only for AMQP0.8.0
156
     * This method allows the server to send a non-fatal warning to
157
     * the client.  This is used for methods that are normally
158
     * asynchronous and thus do not have confirmations, and for which
159
     * the server may detect errors that need to be reported.  Fatal
160
     * errors are handled as channel or connection exceptions; non-
161
     * fatal errors are sent through this method.
162
     *
163
     * @param AMQPReader $reader
164
     */
165
    protected function channel_alert(AMQPReader $reader): void
166
    {
167
        $reply_code = $reader->read_short();
168
        $reply_text = $reader->read_shortstr();
169
        $details = $reader->read_table();
170
        array_push($this->alerts, array($reply_code, $reply_text, $details));
171
    }
172
173
    /**
174
     * Request a channel close
175
     *
176
     * @param int $reply_code
177
     * @param string $reply_text
178
     * @param array $method_sig
179
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
180
     * @return mixed
181
     */
182 31
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
183
    {
184 31
        $this->callbacks = array();
185 31
        if ($this->is_open === false || $this->connection === null) {
186 6
            $this->do_close();
187
188 6
            return null; // already closed
189
        }
190 31
        list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
191
            $reply_code,
192
            $reply_text,
193 31
            $method_sig[0],
194 31
            $method_sig[1]
195
        );
196
197
        try {
198 31
            $this->send_method_frame(array($class_id, $method_id), $args);
199
        } catch (\Exception $e) {
200
            $this->do_close();
201
202
            throw $e;
203
        }
204
205 31
        return $this->wait(array(
206 31
            $this->waitHelper->get_wait('channel.close_ok')
207 31
        ), false, $this->channel_rpc_timeout);
208
    }
209
210
    /**
211
     * @param AMQPReader $reader
212
     * @throws AMQPProtocolChannelException
213
     */
214 1
    protected function channel_close(AMQPReader $reader): void
215
    {
216 1
        $reply_code = $reader->read_short();
217 1
        $reply_text = $reader->read_shortstr();
218 1
        $class_id = $reader->read_short();
219 1
        $method_id = $reader->read_short();
220
221 1
        $this->send_method_frame(array(20, 41));
222 1
        $this->do_close();
223
224 1
        throw new AMQPProtocolChannelException($reply_code, $reply_text, array($class_id, $method_id));
225
    }
226
227
    /**
228
     * Confirm a channel close
229
     * Alias of AMQPChannel::do_close()
230
     */
231 31
    protected function channel_close_ok()
232
    {
233 31
        $this->do_close();
234
    }
235
236
    /**
237
     * Enables/disables flow from peer
238
     *
239
     * @param bool $active
240
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
241
     * @return mixed
242
     */
243
    public function flow($active)
244
    {
245
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
246
        $this->send_method_frame(array($class_id, $method_id), $args);
247
248
        return $this->wait(array(
249
            $this->waitHelper->get_wait('channel.flow_ok')
250
        ), false, $this->channel_rpc_timeout);
251
    }
252
253
    protected function channel_flow(AMQPReader $reader): void
254
    {
255
        $this->active = $reader->read_bit();
256
        $this->x_flow_ok($this->active);
257
    }
258
259
    /**
260
     * @param bool $active
261
     */
262
    protected function x_flow_ok($active)
263
    {
264
        list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
265
        $this->send_method_frame(array($class_id, $method_id), $args);
266
    }
267
268
    protected function channel_flow_ok(AMQPReader $reader): bool
269
    {
270
        return $reader->read_bit();
271
    }
272
273
    /**
274
     * @param string $out_of_band
275
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
276
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
277
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
278
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
279
     * @return mixed
280
     */
281
    protected function x_open($out_of_band = '')
282
    {
283
        if ($this->is_open) {
284
            return null;
285
        }
286
287
        list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
288 36
        $this->send_method_frame(array($class_id, $method_id), $args);
289
290 36
        return $this->wait(array(
291
            $this->waitHelper->get_wait('channel.open_ok')
292
        ), false, $this->channel_rpc_timeout);
293
    }
294 36
295 36
    protected function channel_open_ok()
296
    {
297 36
        $this->is_open = true;
298 36
299 36
        $this->debug->debug_msg('Channel open');
300
    }
301
302 36
    /**
303
     * Requests an access ticket
304 36
     *
305
     * @param string $realm
306 36
     * @param bool $exclusive
307
     * @param bool $passive
308
     * @param bool $active
309
     * @param bool $write
310
     * @param bool $read
311
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
312
     * @return mixed
313
     */
314
    public function access_request(
315
        $realm,
316
        $exclusive = false,
317
        $passive = false,
318
        $active = false,
319
        $write = false,
320
        $read = false
321
    ) {
322
        list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest(
323
            $realm,
324
            $exclusive,
325
            $passive,
326
            $active,
327
            $write,
328
            $read
329
        );
330
331
        $this->send_method_frame(array($class_id, $method_id), $args);
332
333
        return $this->wait(array(
334
            $this->waitHelper->get_wait('access.request_ok')
335
        ), false, $this->channel_rpc_timeout);
336
    }
337
338
    /**
339
     * Grants access to server resources
340
     *
341
     * @param AMQPReader $reader
342
     * @return int
343
     */
344
    protected function access_request_ok(AMQPReader $reader): int
345
    {
346
        $this->default_ticket = $reader->read_short();
347
348
        return $this->default_ticket;
349
    }
350
351
    /**
352
     * Declares exchange
353
     *
354
     * @param string $exchange
355
     * @param string $type
356
     * @param bool $passive
357
     * @param bool $durable
358
     * @param bool $auto_delete
359
     * @param bool $internal
360
     * @param bool $nowait
361
     * @param AMQPTable|array $arguments
362
     * @param int|null $ticket
363
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
364
     * @return mixed|null
365
     */
366
    public function exchange_declare(
367
        $exchange,
368
        $type,
369
        $passive = false,
370
        $durable = false,
371
        $auto_delete = true,
372
        $internal = false,
373 29
        $nowait = false,
374
        $arguments = array(),
375
        $ticket = null
376
    ) {
377
        $ticket = $this->getTicket($ticket);
378
379
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDeclare(
380
            $ticket,
381
            $exchange,
382
            $type,
383
            $passive,
384 29
            $durable,
385
            $auto_delete,
386 29
            $internal,
387
            $nowait,
388
            $arguments
389
        );
390
391
        $this->send_method_frame(array($class_id, $method_id), $args);
392
393
        if ($nowait) {
394
            return null;
395
        }
396
397
        return $this->wait(array(
398 29
            $this->waitHelper->get_wait('exchange.declare_ok')
399
        ), false, $this->channel_rpc_timeout);
400 25
    }
401
402
    /**
403
     * Confirms an exchange declaration
404 25
     */
405 25
    protected function exchange_declare_ok()
406 25
    {
407
    }
408
409
    /**
410
     * Deletes an exchange
411
     *
412
     * @param string $exchange
413
     * @param bool $if_unused
414
     * @param bool $nowait
415
     * @param int|null $ticket
416
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
417
     * @return mixed|null
418
     */
419
    public function exchange_delete(
420
        $exchange,
421
        $if_unused = false,
422
        $nowait = false,
423
        $ticket = null
424
    ) {
425
        $ticket = $this->getTicket($ticket);
426 9
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete(
427
            $ticket,
428
            $exchange,
429
            $if_unused,
430
            $nowait
431
        );
432 9
433 9
        $this->send_method_frame(array($class_id, $method_id), $args);
434
435
        if ($nowait) {
436
            return null;
437
        }
438
439
        return $this->wait(array(
440 9
            $this->waitHelper->get_wait('exchange.delete_ok')
441
        ), false, $this->channel_rpc_timeout);
442 9
    }
443
444
    /**
445
     * Confirms deletion of an exchange
446 9
     */
447 9
    protected function exchange_delete_ok()
448 9
    {
449
    }
450
451
    /**
452
     * Binds dest exchange to source exchange
453
     *
454
     * @param string $destination
455
     * @param string $source
456
     * @param string $routing_key
457
     * @param bool $nowait
458
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
459
     * @param int|null $ticket
460
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
461
     * @return mixed|null
462
     */
463
    public function exchange_bind(
464
        $destination,
465
        $source,
466
        $routing_key = '',
467
        $nowait = false,
468
        $arguments = array(),
469
        $ticket = null
470
    ) {
471
        $ticket = $this->getTicket($ticket);
472
473
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(
0 ignored issues
show
Bug introduced by
The method exchangeBind() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

473
        /** @scrutinizer ignore-call */ 
474
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeBind(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
474
            $ticket,
475
            $destination,
476
            $source,
477
            $routing_key,
478
            $nowait,
479
            $arguments
480
        );
481
482
        $this->send_method_frame(array($class_id, $method_id), $args);
483
484
        if ($nowait) {
485
            return null;
486
        }
487
488
        return $this->wait(array(
489
            $this->waitHelper->get_wait('exchange.bind_ok')
490
        ), false, $this->channel_rpc_timeout);
491
    }
492
493
    /**
494
     * Confirms bind successful
495
     */
496
    protected function exchange_bind_ok()
497
    {
498
    }
499
500
    /**
501
     * Unbinds dest exchange from source exchange
502
     *
503
     * @param string $destination
504
     * @param string $source
505
     * @param string $routing_key
506
     * @param bool $nowait
507
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
508
     * @param int|null $ticket
509
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
510
     * @return mixed
511
     */
512
    public function exchange_unbind(
513
        $destination,
514
        $source,
515
        $routing_key = '',
516
        $nowait = false,
517
        $arguments = array(),
518
        $ticket = null
519
    ) {
520
        $ticket = $this->getTicket($ticket);
521
522
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(
0 ignored issues
show
Bug introduced by
The method exchangeUnbind() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

522
        /** @scrutinizer ignore-call */ 
523
        list($class_id, $method_id, $args) = $this->protocolWriter->exchangeUnbind(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
523
            $ticket,
524
            $destination,
525
            $source,
526
            $routing_key,
527
            $nowait,
528
            $arguments
529
        );
530
531
        $this->send_method_frame(array($class_id, $method_id), $args);
532
533
        return $this->wait(array(
534
            $this->waitHelper->get_wait('exchange.unbind_ok')
535
        ), false, $this->channel_rpc_timeout);
536
    }
537
538
    /**
539
     * Confirms unbind successful
540
     */
541
    protected function exchange_unbind_ok()
542
    {
543
    }
544
545
    /**
546
     * Binds queue to an exchange
547
     *
548
     * @param string $queue
549
     * @param string $exchange
550
     * @param string $routing_key
551
     * @param bool $nowait
552
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
553
     * @param int|null $ticket
554
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
555
     * @return mixed|null
556
     */
557
    public function queue_bind(
558
        $queue,
559
        $exchange,
560
        $routing_key = '',
561
        $nowait = false,
562
        $arguments = array(),
563
        $ticket = null
564 25
    ) {
565
        $ticket = $this->getTicket($ticket);
566
567
        list($class_id, $method_id, $args) = $this->protocolWriter->queueBind(
568
            $ticket,
569
            $queue,
570
            $exchange,
571
            $routing_key,
572 25
            $nowait,
573
            $arguments
574 25
        );
575
576
        $this->send_method_frame(array($class_id, $method_id), $args);
577
578
        if ($nowait) {
579
            return null;
580
        }
581
582
        return $this->wait(array(
583 25
            $this->waitHelper->get_wait('queue.bind_ok')
584
        ), false, $this->channel_rpc_timeout);
585 25
    }
586
587
    /**
588
     * Confirms bind successful
589 25
     */
590 25
    protected function queue_bind_ok()
591 25
    {
592
    }
593
594
    /**
595
     * Unbind queue from an exchange
596
     *
597
     * @param string $queue
598
     * @param string $exchange
599
     * @param string $routing_key
600
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
601
     * @param int|null $ticket
602
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
603
     * @return mixed
604
     */
605
    public function queue_unbind(
606
        $queue,
607
        $exchange,
608
        $routing_key = '',
609
        $arguments = array(),
610
        $ticket = null
611
    ) {
612
        $ticket = $this->getTicket($ticket);
613
614
        list($class_id, $method_id, $args) = $this->protocolWriter->queueUnbind(
615
            $ticket,
616
            $queue,
617
            $exchange,
618
            $routing_key,
619
            $arguments
620
        );
621
622
        $this->send_method_frame(array($class_id, $method_id), $args);
623
624
        return $this->wait(array(
625
            $this->waitHelper->get_wait('queue.unbind_ok')
626
        ), false, $this->channel_rpc_timeout);
627
    }
628
629
    /**
630
     * Confirms unbind successful
631
     */
632
    protected function queue_unbind_ok()
633
    {
634
    }
635
636
    /**
637
     * Declares queue, creates if needed
638
     *
639
     * @param string $queue
640
     * @param bool $passive
641
     * @param bool $durable
642
     * @param bool $exclusive
643
     * @param bool $auto_delete
644
     * @param bool $nowait
645
     * @param array|AMQPTable $arguments
646
     * @param int|null $ticket
647
     * @return array|null
648
     *@throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
649
     */
650
    public function queue_declare(
651
        $queue = '',
652
        $passive = false,
653
        $durable = false,
654
        $exclusive = false,
655
        $auto_delete = true,
656
        $nowait = false,
657 29
        $arguments = array(),
658
        $ticket = null
659
    ) {
660
        $ticket = $this->getTicket($ticket);
661
662
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDeclare(
663
            $ticket,
664
            $queue,
665
            $passive,
666
            $durable,
667 29
            $exclusive,
668
            $auto_delete,
669 29
            $nowait,
670
            $arguments
671
        );
672
673
        $this->send_method_frame(array($class_id, $method_id), $args);
674
675
        if ($nowait) {
676
            return null;
677
        }
678
679
        return $this->wait(array(
680 29
            $this->waitHelper->get_wait('queue.declare_ok')
681
        ), false, $this->channel_rpc_timeout);
682 29
    }
683
684
    /**
685
     * Confirms a queue definition
686 29
     *
687 29
     * @param AMQPReader $reader
688 29
     * @return string[]
689
     */
690
    protected function queue_declare_ok(AMQPReader $reader)
691
    {
692
        $queue = $reader->read_shortstr();
693
        $message_count = $reader->read_long();
694
        $consumer_count = $reader->read_long();
695
696
        return array($queue, $message_count, $consumer_count);
697 28
    }
698
699 28
    /**
700 28
     * Deletes a queue
701 28
     *
702
     * @param string $queue
703 28
     * @param bool $if_unused
704
     * @param bool $if_empty
705
     * @param bool $nowait
706
     * @param int|null $ticket
707
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
708
     * @return mixed|null
709
     */
710
    public function queue_delete($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
711
    {
712
        $ticket = $this->getTicket($ticket);
713
714
        list($class_id, $method_id, $args) = $this->protocolWriter->queueDelete(
715
            $ticket,
716
            $queue,
717 7
            $if_unused,
718
            $if_empty,
719 7
            $nowait
720
        );
721 7
722
        $this->send_method_frame(array($class_id, $method_id), $args);
723
724
        if ($nowait) {
725
            return null;
726
        }
727
728
        return $this->wait(array(
729 7
            $this->waitHelper->get_wait('queue.delete_ok')
730
        ), false, $this->channel_rpc_timeout);
731 7
    }
732 1
733
    /**
734
     * Confirms deletion of a queue
735 6
     *
736 6
     * @param AMQPReader $reader
737 6
     * @return int|string
738
     */
739
    protected function queue_delete_ok(AMQPReader $reader)
740
    {
741
        return $reader->read_long();
742
    }
743
744
    /**
745
     * Purges a queue
746 6
     *
747
     * @param string $queue
748 6
     * @param bool $nowait
749
     * @param int|null $ticket
750
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
751
     * @return mixed|null
752
     */
753
    public function queue_purge($queue = '', $nowait = false, $ticket = null)
754
    {
755
        $ticket = $this->getTicket($ticket);
756
        list($class_id, $method_id, $args) = $this->protocolWriter->queuePurge($ticket, $queue, $nowait);
757
758
        $this->send_method_frame(array($class_id, $method_id), $args);
759
760
        if ($nowait) {
761
            return null;
762
        }
763
764
        return $this->wait(array(
765
            $this->waitHelper->get_wait('queue.purge_ok')
766
        ), false, $this->channel_rpc_timeout);
767
    }
768
769
    /**
770
     * Confirms a queue purge
771
     *
772
     * @param AMQPReader $reader
773
     * @return int|string
774
     */
775
    protected function queue_purge_ok(AMQPReader $reader)
776
    {
777
        return $reader->read_long();
778
    }
779
780
    /**
781
     * Acknowledges one or more messages
782
     *
783
     * @param int $delivery_tag
784
     * @param bool $multiple
785
     */
786
    public function basic_ack($delivery_tag, $multiple = false)
787
    {
788
        list($class_id, $method_id, $args) = $this->protocolWriter->basicAck($delivery_tag, $multiple);
789
        $this->send_method_frame(array($class_id, $method_id), $args);
790
    }
791
792
    /**
793 2
     * Called when the server sends a basic.ack
794
     *
795 2
     * @param AMQPReader $reader
796 2
     * @throws AMQPRuntimeException
797
     */
798
    protected function basic_ack_from_server(AMQPReader $reader): void
799
    {
800
        $delivery_tag = $reader->read_longlong();
801
        $multiple = (bool) $reader->read_bit();
802
803
        if (!isset($this->published_messages[$delivery_tag])) {
804
            throw new AMQPRuntimeException(sprintf(
805 3
                'Server ack\'ed unknown delivery_tag "%s"',
806
                $delivery_tag
807 3
            ));
808 3
        }
809
810 3
        $this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

810
        $this->internal_ack_handler(/** @scrutinizer ignore-type */ $delivery_tag, $multiple, $this->ack_handler);
Loading history...
811
    }
812
813
    /**
814
     * Called when the server sends a basic.nack
815
     *
816
     * @param AMQPReader $reader
817 3
     * @throws AMQPRuntimeException
818
     */
819
    protected function basic_nack_from_server(AMQPReader $reader): void
820
    {
821
        $delivery_tag = $reader->read_longlong();
822
        $multiple = (bool) $reader->read_bit();
823
824
        if (!isset($this->published_messages[$delivery_tag])) {
825
            throw new AMQPRuntimeException(sprintf(
826
                'Server nack\'ed unknown delivery_tag "%s"',
827
                $delivery_tag
828
            ));
829
        }
830
831
        $this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPC...:internal_ack_handler() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

831
        $this->internal_ack_handler(/** @scrutinizer ignore-type */ $delivery_tag, $multiple, $this->nack_handler);
Loading history...
832
    }
833
834
    /**
835
     * Handles the deletion of messages from this->publishedMessages and dispatches them to the $handler
836
     *
837
     * @param int $delivery_tag
838
     * @param bool $multiple
839
     * @param callable $handler
840
     */
841
    protected function internal_ack_handler($delivery_tag, $multiple, $handler)
842
    {
843
        if ($multiple) {
844
            $keys = $this->get_keys_less_or_equal($this->published_messages, $delivery_tag);
845
846
            foreach ($keys as $key) {
847
                $this->internal_ack_handler($key, false, $handler);
848 3
            }
849
        } else {
850 3
            $message = $this->get_and_unset_message($delivery_tag);
851
            $this->dispatch_to_handler($handler, array($message));
852
        }
853
    }
854
855
    /**
856
     * @param AMQPMessage[] $messages
857 3
     * @param string $value
858 3
     * @return mixed
859
     */
860
    protected function get_keys_less_or_equal(array $messages, $value)
861
    {
862
        $value = (int) $value;
863
        $keys = array_reduce(
864
            array_keys($messages),
865
            /**
866
             * @param string $key
867
             */
868
            function ($keys, $key) use ($value) {
869
                if ($key <= $value) {
870
                    $keys[] = $key;
871
                }
872
873
                return $keys;
874
            },
875
            array()
876
        );
877
878
        return $keys;
879
    }
880
881
    /**
882
     * Rejects one or several received messages
883
     *
884
     * @param int $delivery_tag
885
     * @param bool $multiple
886
     * @param bool $requeue
887
     */
888
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
889
    {
890
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);
0 ignored issues
show
Bug introduced by
The method basicNack() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. Did you maybe mean basicAck()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

890
        /** @scrutinizer ignore-call */ 
891
        list($class_id, $method_id, $args) = $this->protocolWriter->basicNack($delivery_tag, $multiple, $requeue);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
891
        $this->send_method_frame(array($class_id, $method_id), $args);
892
    }
893
894
    /**
895
     * Ends a queue consumer
896
     *
897
     * @param string $consumer_tag
898
     * @param bool $nowait
899
     * @param bool $noreturn
900
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
901
     * @return mixed
902
     */
903
    public function basic_cancel($consumer_tag, $nowait = false, $noreturn = false)
904
    {
905
        list($class_id, $method_id, $args) = $this->protocolWriter->basicCancel($consumer_tag, $nowait);
906
        $this->send_method_frame(array($class_id, $method_id), $args);
907
908
        if ($nowait || $noreturn) {
909
            unset($this->callbacks[$consumer_tag]);
910 3
            return $consumer_tag;
911
        }
912 3
913 3
        return $this->wait(array(
914
            $this->waitHelper->get_wait('basic.cancel_ok')
915 3
        ), false, $this->channel_rpc_timeout);
916
    }
917
918
    /**
919
     * @param AMQPReader $reader
920 3
     * @throws \PhpAmqpLib\Exception\AMQPBasicCancelException
921 3
     */
922 3
    protected function basic_cancel_from_server(AMQPReader $reader)
923
    {
924
        throw new AMQPBasicCancelException($reader->read_shortstr());
925
    }
926
927
    /**
928
     * Confirm a cancelled consumer
929
     *
930
     * @param AMQPReader $reader
931
     * @return string
932
     */
933
    protected function basic_cancel_ok(AMQPReader $reader): string
934
    {
935
        $consumerTag = $reader->read_shortstr();
936
        unset($this->callbacks[$consumerTag]);
937
938
        return $consumerTag;
939
    }
940 3
941
    /**
942 3
     * @return bool
943 3
     */
944
    public function is_consuming()
945 3
    {
946
        return !empty($this->callbacks);
947
    }
948
949
    /**
950
     * Start a queue consumer.
951 1
     * This method asks the server to start a "consumer", which is a transient request for messages
952
     * from a specific queue.
953 1
     * Consumers last as long as the channel they were declared on, or until the client cancels them.
954
     *
955
     * @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
956
     *
957
     * @param string $queue
958
     * @param string $consumer_tag
959
     * @param bool $no_local
960
     * @param bool $no_ack
961
     * @param bool $exclusive
962
     * @param bool $nowait
963
     * @param callable|null $callback
964
     * @param int|null $ticket
965
     * @param \PhpAmqpLib\Wire\AMQPTable|array $arguments
966
     *
967
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
968
     * @throws \InvalidArgumentException
969
     * @return string
970
     */
971
    public function basic_consume(
972
        $queue = '',
973
        $consumer_tag = '',
974
        $no_local = false,
975
        $no_ack = false,
976
        $exclusive = false,
977
        $nowait = false,
978 7
        $callback = null,
979
        $ticket = null,
980
        $arguments = array()
981
    ) {
982
        if (null !== $callback) {
983
            Assert::isCallable($callback);
984
        }
985
        if ($nowait && empty($consumer_tag)) {
986
            throw new \InvalidArgumentException('Cannot start consumer without consumer_tag and no-wait=true');
987
        }
988
        if (!empty($consumer_tag) && array_key_exists($consumer_tag, $this->callbacks)) {
989 7
            throw new \InvalidArgumentException('This consumer tag is already registered.');
990 6
        }
991
992 6
        $ticket = $this->getTicket($ticket);
993 1
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(
994
            $ticket,
995 5
            $queue,
996 1
            $consumer_tag,
997
            $no_local,
998
            $no_ack,
999 5
            $exclusive,
1000 5
            $nowait,
1001
            $this->protocolVersion === Wire\Constants091::VERSION ? $arguments : null
0 ignored issues
show
Unused Code introduced by
The call to PhpAmqpLib\Helper\Protoc...ocol080::basicConsume() has too many arguments starting with $this->protocolVersion =...ION ? $arguments : null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1001
        /** @scrutinizer ignore-call */ 
1002
        list($class_id, $method_id, $args) = $this->protocolWriter->basicConsume(

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
1002
        );
1003
1004
        $this->send_method_frame(array($class_id, $method_id), $args);
1005
1006
        if (false === $nowait) {
1007
            $consumer_tag = $this->wait(array(
1008 5
                $this->waitHelper->get_wait('basic.consume_ok')
1009
            ), false, $this->channel_rpc_timeout);
1010
        }
1011 5
1012
        $this->callbacks[$consumer_tag] = $callback;
1013 5
1014 5
        return $consumer_tag;
1015 5
    }
1016 5
1017
    /**
1018
     * Confirms a new consumer
1019 5
     *
1020
     * @param AMQPReader $reader
1021 5
     * @return string
1022
     */
1023
    protected function basic_consume_ok(AMQPReader $reader): string
1024
    {
1025
        return $reader->read_shortstr();
1026
    }
1027
1028
    /**
1029
     * Notifies the client of a consumer message
1030 5
     *
1031
     * @param AMQPReader $reader
1032 5
     * @param AMQPMessage $message
1033
     */
1034
    protected function basic_deliver(AMQPReader $reader, AMQPMessage $message): void
1035
    {
1036
        $consumer_tag = $reader->read_shortstr();
1037
        $delivery_tag = $reader->read_longlong();
1038
        $redelivered = $reader->read_bit();
1039
        $exchange = $reader->read_shortstr();
1040
        $routing_key = $reader->read_shortstr();
1041 4
1042
        $message
1043 4
            ->setChannel($this)
1044 4
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $deliveryTag of PhpAmqpLib\Message\AMQPMessage::setDeliveryInfo() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1044
            ->setDeliveryInfo(/** @scrutinizer ignore-type */ $delivery_tag, $redelivered, $exchange, $routing_key)
Loading history...
1045 4
            ->setConsumerTag($consumer_tag);
1046 4
1047 4
        if (isset($this->callbacks[$consumer_tag])) {
1048
            call_user_func($this->callbacks[$consumer_tag], $message);
1049
        }
1050 4
    }
1051 4
1052 4
    /**
1053
     * Direct access to a queue if no message was available in the queue, return null
1054 4
     *
1055 4
     * @param string $queue
1056
     * @param bool $no_ack
1057
     * @param int|null $ticket
1058
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1059
     * @return AMQPMessage|null
1060
     */
1061
    public function basic_get($queue = '', $no_ack = false, $ticket = null)
1062
    {
1063
        $ticket = $this->getTicket($ticket);
1064
        list($class_id, $method_id, $args) = $this->protocolWriter->basicGet($ticket, $queue, $no_ack);
1065
1066
        $this->send_method_frame(array($class_id, $method_id), $args);
1067
1068 8
        return $this->wait(array(
1069
            $this->waitHelper->get_wait('basic.get_ok'),
1070 8
            $this->waitHelper->get_wait('basic.get_empty')
1071 8
        ), false, $this->channel_rpc_timeout);
1072
    }
1073 8
1074
    /**
1075 8
     * Indicates no messages available
1076 8
     */
1077 8
    protected function basic_get_empty()
1078 8
    {
1079
    }
1080
1081
    /**
1082
     * Provides client with a message
1083
     *
1084
     * @param AMQPReader $reader
1085
     * @param AMQPMessage $message
1086
     * @return AMQPMessage
1087
     */
1088
    protected function basic_get_ok(AMQPReader $reader, AMQPMessage $message): AMQPMessage
1089
    {
1090
        $delivery_tag = $reader->read_longlong();
1091
        $redelivered = $reader->read_bit();
1092
        $exchange = $reader->read_shortstr();
1093
        $routing_key = $reader->read_shortstr();
1094
        $message_count = $reader->read_long();
1095 8
1096
        $message
1097 8
            ->setChannel($this)
1098 8
            ->setDeliveryInfo($delivery_tag, $redelivered, $exchange, $routing_key)
0 ignored issues
show
Bug introduced by
It seems like $delivery_tag can also be of type string; however, parameter $deliveryTag of PhpAmqpLib\Message\AMQPMessage::setDeliveryInfo() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1098
            ->setDeliveryInfo(/** @scrutinizer ignore-type */ $delivery_tag, $redelivered, $exchange, $routing_key)
Loading history...
1099 8
            ->setMessageCount($message_count);
0 ignored issues
show
Bug introduced by
It seems like $message_count can also be of type string; however, parameter $messageCount of PhpAmqpLib\Message\AMQPMessage::setMessageCount() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1099
            ->setMessageCount(/** @scrutinizer ignore-type */ $message_count);
Loading history...
1100 8
1101 8
        return $message;
1102
    }
1103
1104 8
    /**
1105 8
     * @param string $exchange
1106 8
     * @param string $routing_key
1107
     * @param bool $mandatory
1108 8
     * @param bool $immediate
1109
     * @param int $ticket
1110
     * @return mixed
1111
     */
1112
    private function prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket)
1113
    {
1114
        $cache_key = sprintf(
1115
            '%s|%s|%s|%s|%s',
1116
            $exchange,
1117
            $routing_key,
1118
            $mandatory,
1119 14
            $immediate,
1120
            $ticket
1121 14
        );
1122
        if (false === isset($this->publish_cache[$cache_key])) {
1123
            $ticket = $this->getTicket($ticket);
1124
            list($class_id, $method_id, $args) = $this->protocolWriter->basicPublish(
1125
                $ticket,
1126
                $exchange,
1127
                $routing_key,
1128
                $mandatory,
1129 14
                $immediate
1130 14
            );
1131 14
1132
            $pkt = $this->prepare_method_frame(array($class_id, $method_id), $args);
1133
            $this->publish_cache[$cache_key] = $pkt->getvalue();
1134
            if (count($this->publish_cache) > $this->publish_cache_max_size) {
1135
                reset($this->publish_cache);
1136
                $old_key = key($this->publish_cache);
1137
                unset($this->publish_cache[$old_key]);
1138
            }
1139 14
        }
1140 14
1141 14
        return $this->publish_cache[$cache_key];
1142
    }
1143
1144
    /**
1145
     * Publishes a message
1146
     *
1147
     * @param AMQPMessage $msg
1148 14
     * @param string $exchange
1149
     * @param string $routing_key
1150
     * @param bool $mandatory
1151
     * @param bool $immediate
1152
     * @param int|null $ticket
1153
     * @throws AMQPChannelClosedException
1154
     * @throws AMQPConnectionClosedException
1155
     * @throws AMQPConnectionBlockedException
1156
     */
1157
    public function basic_publish(
1158
        $msg,
1159
        $exchange = '',
1160
        $routing_key = '',
1161
        $mandatory = false,
1162
        $immediate = false,
1163
        $ticket = null
1164 15
    ) {
1165
        $this->checkConnection();
1166
        $pkt = new AMQPWriter();
1167
        $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1168
1169
        try {
1170
            $this->connection->send_content(
0 ignored issues
show
Bug introduced by
The method send_content() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1170
            $this->connection->/** @scrutinizer ignore-call */ 
1171
                               send_content(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
1171
                $this->channel_id,
1172 15
                60,
1173 14
                0,
1174 14
                mb_strlen($msg->body, 'ASCII'),
1175
                $msg->serialize_properties(),
1176
                $msg->body,
1177 14
                $pkt
1178 14
            );
1179
        } catch (AMQPConnectionClosedException $e) {
1180
            $this->do_close();
1181 14
            throw $e;
1182 14
        }
1183 14
1184
        if ($this->next_delivery_tag > 0) {
1185
            $this->published_messages[$this->next_delivery_tag] = $msg;
1186
            $msg->setDeliveryInfo($this->next_delivery_tag, false, $exchange, $routing_key);
1187
            $this->next_delivery_tag++;
1188
        }
1189
    }
1190
1191 14
    /**
1192 3
     * @param AMQPMessage $message
1193 3
     * @param string $exchange
1194 3
     * @param string $routing_key
1195
     * @param bool $mandatory
1196
     * @param bool $immediate
1197
     * @param int|null $ticket
1198
     */
1199
    public function batch_basic_publish(
1200
        $message,
1201
        $exchange = '',
1202
        $routing_key = '',
1203
        $mandatory = false,
1204
        $immediate = false,
1205
        $ticket = null
1206
    ) {
1207
        $this->batch_messages[] = [
1208
            $message,
1209
            $exchange,
1210
            $routing_key,
1211
            $mandatory,
1212
            $immediate,
1213
            $ticket
1214
        ];
1215
    }
1216
1217
    /**
1218
     * Publish batch
1219
     *
1220
     * @return void
1221
     * @throws AMQPChannelClosedException
1222
     * @throws AMQPConnectionClosedException
1223
     * @throws AMQPConnectionBlockedException
1224
     */
1225
    public function publish_batch()
1226
    {
1227
        if (empty($this->batch_messages)) {
1228
            return;
1229
        }
1230
1231
        $this->checkConnection();
1232
1233
        /** @var AMQPWriter $pkt */
1234
        $pkt = new AMQPWriter();
1235
1236
        foreach ($this->batch_messages as $m) {
1237
            /** @var AMQPMessage $msg */
1238
            $msg = $m[0];
1239
1240
            $exchange = isset($m[1]) ? $m[1] : '';
1241
            $routing_key = isset($m[2]) ? $m[2] : '';
1242
            $mandatory = isset($m[3]) ? $m[3] : false;
1243
            $immediate = isset($m[4]) ? $m[4] : false;
1244
            $ticket = isset($m[5]) ? $m[5] : null;
1245
            $pkt->write($this->prePublish($exchange, $routing_key, $mandatory, $immediate, $ticket));
1246
1247
            $this->connection->prepare_content(
1248
                $this->channel_id,
1249
                60,
1250
                0,
1251
                mb_strlen($msg->body, 'ASCII'),
1252
                $msg->serialize_properties(),
1253
                $msg->body,
1254
                $pkt
1255
            );
1256
1257
            if ($this->next_delivery_tag > 0) {
1258
                $this->published_messages[$this->next_delivery_tag] = $msg;
1259
                $this->next_delivery_tag++;
1260
            }
1261
        }
1262
1263
        $this->connection->write($pkt->getvalue());
1264
        $this->batch_messages = array();
1265
    }
1266
1267
    /**
1268
     * Specifies QoS
1269
     * 
1270
     * See https://www.rabbitmq.com/consumer-prefetch.html#overview for details
1271
     * 
1272
     * @param int $prefetch_size Default is 0 (Alias for unlimited)
1273
     * @param int $prefetch_count Default is 0 (Alias for unlimited)
1274
     * @param bool $global Default is false, prefetch size and count are applied to each channel consumer separately
1275
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1276
     * @return mixed
1277
     */
1278
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)
1279
    {
1280
        list($class_id, $method_id, $args) = $this->protocolWriter->basicQos(
1281
            $prefetch_size,
1282
            $prefetch_count,
1283
            $a_global
1284
        );
1285
1286
        $this->send_method_frame(array($class_id, $method_id), $args);
1287
1288
        return $this->wait(array(
1289
            $this->waitHelper->get_wait('basic.qos_ok')
1290
        ), false, $this->channel_rpc_timeout);
1291
    }
1292
1293
    /**
1294
     * Confirms QoS request
1295
     */
1296
    protected function basic_qos_ok()
1297
    {
1298
    }
1299
1300
    /**
1301
     * Redelivers unacknowledged messages
1302
     *
1303
     * @param bool $requeue
1304
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1305
     * @return mixed
1306
     */
1307
    public function basic_recover($requeue = false)
1308
    {
1309
        list($class_id, $method_id, $args) = $this->protocolWriter->basicRecover($requeue);
1310
        $this->send_method_frame(array($class_id, $method_id), $args);
1311
1312
        return $this->wait(array(
1313
            $this->waitHelper->get_wait('basic.recover_ok')
1314
        ), false, $this->channel_rpc_timeout);
1315
    }
1316
1317
    /**
1318
     * Confirm the requested recover
1319
     */
1320
    protected function basic_recover_ok()
1321
    {
1322
    }
1323
1324
    /**
1325
     * Rejects an incoming message
1326
     *
1327
     * @param int $delivery_tag
1328
     * @param bool $requeue
1329
     */
1330
    public function basic_reject($delivery_tag, $requeue)
1331
    {
1332
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
1333
        $this->send_method_frame(array($class_id, $method_id), $args);
1334
    }
1335
1336
    /**
1337
     * Returns a failed message
1338
     *
1339
     * @param AMQPReader $reader
1340
     * @param AMQPMessage $message
1341
     */
1342
    protected function basic_return(AMQPReader $reader, AMQPMessage $message)
1343
    {
1344
        $callback = $this->basic_return_callback;
1345
        if (!is_callable($callback)) {
1346
            $this->debug->debug_msg('Skipping unhandled basic_return message');
1347
            return null;
1348
        }
1349
1350
        $reply_code = $reader->read_short();
1351
        $reply_text = $reader->read_shortstr();
1352
        $exchange = $reader->read_shortstr();
1353
        $routing_key = $reader->read_shortstr();
1354
1355
        call_user_func_array($callback, array(
0 ignored issues
show
Bug introduced by
It seems like $callback can also be of type null; however, parameter $callback of call_user_func_array() does only seem to accept callable, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

1355
        call_user_func_array(/** @scrutinizer ignore-type */ $callback, array(
Loading history...
1356
            $reply_code,
1357
            $reply_text,
1358
            $exchange,
1359
            $routing_key,
1360
            $message,
1361
        ));
1362
    }
1363
1364
    /**
1365
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1366
     * @return mixed
1367
     */
1368
    public function tx_commit()
1369
    {
1370
        $this->send_method_frame(array(90, 20));
1371
1372
        return $this->wait(array(
1373
            $this->waitHelper->get_wait('tx.commit_ok')
1374
        ), false, $this->channel_rpc_timeout);
1375
    }
1376
1377
    /**
1378
     * Confirms a successful commit
1379
     */
1380
    protected function tx_commit_ok()
1381
    {
1382
    }
1383
1384
    /**
1385
     * Rollbacks the current transaction
1386
     *
1387
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1388
     * @return mixed
1389
     */
1390
    public function tx_rollback()
1391
    {
1392
        $this->send_method_frame(array(90, 30));
1393
1394
        return $this->wait(array(
1395
            $this->waitHelper->get_wait('tx.rollback_ok')
1396
        ), false, $this->channel_rpc_timeout);
1397
    }
1398
1399
    /**
1400
     * Confirms a successful rollback
1401
     */
1402
    protected function tx_rollback_ok()
1403
    {
1404
    }
1405
1406
    /**
1407
     * Puts the channel into confirm mode
1408
     * Beware that only non-transactional channels may be put into confirm mode and vice versa
1409
     *
1410
     * @param bool $nowait
1411
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1412
     */
1413
    public function confirm_select($nowait = false)
1414
    {
1415
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);
0 ignored issues
show
Bug introduced by
The method confirmSelect() does not exist on PhpAmqpLib\Helper\Protocol\Protocol080. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1415
        /** @scrutinizer ignore-call */ 
1416
        list($class_id, $method_id, $args) = $this->protocolWriter->confirmSelect($nowait);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
1416
1417 4
        $this->send_method_frame(array($class_id, $method_id), $args);
1418
1419 4
        if ($nowait) {
1420
            return null;
1421 4
        }
1422
1423 4
        $this->wait(array(
1424
            $this->waitHelper->get_wait('confirm.select_ok')
1425
        ), false, $this->channel_rpc_timeout);
1426
        $this->next_delivery_tag = 1;
1427 4
    }
1428 4
1429 4
    /**
1430 3
     * Confirms a selection
1431
     */
1432
    public function confirm_select_ok()
1433
    {
1434
    }
1435
1436
    /**
1437
     * Waits for pending acks and nacks from the server.
1438
     * If there are no pending acks, the method returns immediately
1439
     *
1440
     * @param int|float $timeout Waits until $timeout value is reached
1441
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1442
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1443
     */
1444
    public function wait_for_pending_acks($timeout = 0)
1445
    {
1446
        $functions = array(
1447
            $this->waitHelper->get_wait('basic.ack'),
1448 2
            $this->waitHelper->get_wait('basic.nack'),
1449
        );
1450
        $timeout = max(0, $timeout);
1451 2
        while (!empty($this->published_messages)) {
1452 2
            $this->wait($functions, false, $timeout);
1453
        }
1454 2
    }
1455 2
1456 2
    /**
1457
     * Waits for pending acks, nacks and returns from the server.
1458
     * If there are no pending acks, the method returns immediately.
1459
     *
1460
     * @param int|float $timeout If set to value > 0 the method will wait at most $timeout seconds for pending acks.
1461
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
1462
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1463
     */
1464
    public function wait_for_pending_acks_returns($timeout = 0)
1465
    {
1466
        $functions = array(
1467
            $this->waitHelper->get_wait('basic.ack'),
1468 1
            $this->waitHelper->get_wait('basic.nack'),
1469
            $this->waitHelper->get_wait('basic.return'),
1470
        );
1471 1
1472 1
        $timeout = max(0, $timeout);
1473 1
        while (!empty($this->published_messages)) {
1474
            $this->wait($functions, false, $timeout);
1475
        }
1476 1
    }
1477 1
1478 1
    /**
1479
     * Selects standard transaction mode
1480
     *
1481
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
1482
     * @return mixed
1483
     */
1484
    public function tx_select()
1485
    {
1486
        $this->send_method_frame(array(90, 10));
1487
1488
        return $this->wait(array(
1489
            $this->waitHelper->get_wait('tx.select_ok')
1490
        ), false, $this->channel_rpc_timeout);
1491
    }
1492
1493
    /**
1494
     * Confirms transaction mode
1495
     */
1496
    protected function tx_select_ok()
1497
    {
1498
    }
1499
1500
    /**
1501
     * @param int|null $ticket
1502
     * @return int
1503
     */
1504
    protected function getTicket($ticket)
1505
    {
1506
        return (null === $ticket) ? $this->default_ticket : $ticket;
1507
    }
1508 32
1509
    /**
1510 32
     * Helper method to get a particular method from $this->publishedMessages, removes it from the array and returns it.
1511
     *
1512
     * @param int $index
1513
     * @return AMQPMessage
1514
     */
1515
    protected function get_and_unset_message($index)
1516
    {
1517
        $message = $this->published_messages[$index];
1518
        unset($this->published_messages[$index]);
1519 3
1520
        return $message;
1521 3
    }
1522 3
1523
    /**
1524 3
     * Sets callback for basic_return
1525
     *
1526
     * @param  callable $callback
1527
     * @throws \InvalidArgumentException if $callback is not callable
1528
     */
1529
    public function set_return_listener($callback)
1530
    {
1531
        Assert::isCallable($callback);
1532
        $this->basic_return_callback = $callback;
1533
    }
1534
1535
    /**
1536
     * Sets a handler which called for any message nack'ed by the server, with the AMQPMessage as first argument.
1537
     *
1538
     * @param callable $callback
1539
     * @throws \InvalidArgumentException
1540
     */
1541
    public function set_nack_handler($callback)
1542
    {
1543
        Assert::isCallable($callback);
1544
        $this->nack_handler = $callback;
1545
    }
1546
1547
    /**
1548
     * Sets a handler which called for any message ack'ed by the server, with the AMQPMessage as first argument.
1549
     *
1550
     * @param callable $callback
1551
     * @throws \InvalidArgumentException
1552
     */
1553
    public function set_ack_handler($callback)
1554
    {
1555
        Assert::isCallable($callback);
1556
        $this->ack_handler = $callback;
1557 3
    }
1558
1559 3
    /**
1560 3
     * @throws AMQPChannelClosedException
1561
     * @throws AMQPConnectionClosedException
1562
     * @throws AMQPConnectionBlockedException
1563
     */
1564
    private function checkConnection()
1565
    {
1566
        if ($this->connection === null || !$this->connection->isConnected()) {
1567
            throw new AMQPChannelClosedException('Channel connection is closed.');
1568 15
        }
1569
        if ($this->connection->isBlocked()) {
1570 15
            throw new AMQPConnectionBlockedException();
1571
        }
1572
    }
1573 15
1574 1
    /**
1575
     * Wait and process all incoming messages in an endless loop,
1576
     * until connection exception or manual stop using self::stopConsume()
1577
     *
1578
     * @param float $maximumPoll Maximum time in seconds between read attempts
1579
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
1580
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
1581
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
1582
     * @throws \ErrorException
1583
     * @since 3.2.0
1584
     */
1585
    public function consume(float $maximumPoll = 10.0): void
1586
    {
1587
        $this->checkConnection();
1588
1589
        if ($this->stopConsume) {
1590
            $this->stopConsume = false;
1591
            return;
1592
        }
1593
1594
        $timeout = $this->connection->getReadTimeout();
1595
        $heartBeat = $this->connection->getHeartbeat();
1596
        if ($heartBeat > 2) {
1597
            $timeout = min($timeout, floor($heartBeat / 2));
1598
        }
1599
        $timeout = max(min($timeout, $maximumPoll), 1);
1600
        while ($this->is_consuming() || !empty($this->method_queue)) {
1601
            if ($this->stopConsume) {
1602
                $this->stopConsume = false;
1603
                return;
1604
            }
1605
            try {
1606
                $this->wait(null, false, $timeout);
1607
            } catch (AMQPTimeoutException $exception) {
1608
                // something might be wrong, try to send heartbeat which involves select+write
1609
                $this->connection->checkHeartBeat();
1610
                continue;
1611
            } catch (AMQPNoDataException $exception) {
1612
                continue;
1613
            }
1614
        }
1615
    }
1616
1617
    /**
1618
     * Stop AMQPChannel::consume() loop. Useful for signal handlers and other interrupts.
1619
     * @since 3.2.0
1620
     */
1621
    public function stopConsume()
1622
    {
1623
        $this->stopConsume = true;
1624
    }
1625
}
1626