Completed
Push — master ( 79150a...2528a8 )
by Sergey
03:52
created

Channel::onBasicNack()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 10
Ratio 100 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 10
loc 10
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 1
crap 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ChannelInterface;
6
use ButterAMQP\Confirm;
7
use ButterAMQP\Delivery;
8
use ButterAMQP\Exception\AMQPException;
9
use ButterAMQP\Exception\NoReturnException;
10
use ButterAMQP\Exception\TransactionNotSelectedException;
11
use ButterAMQP\Exception\UnknownConsumerTagException;
12
use ButterAMQP\AMQP091\Framing\Content;
13
use ButterAMQP\AMQP091\Framing\Frame;
14
use ButterAMQP\AMQP091\Framing\Header;
15
use ButterAMQP\AMQP091\Framing\Method\BasicAck;
16
use ButterAMQP\AMQP091\Framing\Method\BasicCancel;
17
use ButterAMQP\AMQP091\Framing\Method\BasicCancelOk;
18
use ButterAMQP\AMQP091\Framing\Method\BasicConsume;
19
use ButterAMQP\AMQP091\Framing\Method\BasicConsumeOk;
20
use ButterAMQP\AMQP091\Framing\Method\BasicDeliver;
21
use ButterAMQP\AMQP091\Framing\Method\BasicGet;
22
use ButterAMQP\AMQP091\Framing\Method\BasicGetEmpty;
23
use ButterAMQP\AMQP091\Framing\Method\BasicGetOk;
24
use ButterAMQP\AMQP091\Framing\Method\BasicNack;
25
use ButterAMQP\AMQP091\Framing\Method\BasicPublish;
26
use ButterAMQP\AMQP091\Framing\Method\BasicQos;
27
use ButterAMQP\AMQP091\Framing\Method\BasicQosOk;
28
use ButterAMQP\AMQP091\Framing\Method\BasicRecover;
29
use ButterAMQP\AMQP091\Framing\Method\BasicRecoverOk;
30
use ButterAMQP\AMQP091\Framing\Method\BasicReject;
31
use ButterAMQP\AMQP091\Framing\Method\BasicReturn;
32
use ButterAMQP\AMQP091\Framing\Method\ChannelClose;
33
use ButterAMQP\AMQP091\Framing\Method\ChannelCloseOk;
34
use ButterAMQP\AMQP091\Framing\Method\ChannelFlow;
35
use ButterAMQP\AMQP091\Framing\Method\ChannelFlowOk;
36
use ButterAMQP\AMQP091\Framing\Method\ChannelOpen;
37
use ButterAMQP\AMQP091\Framing\Method\ChannelOpenOk;
38
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelect;
39
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelectOk;
40
use ButterAMQP\AMQP091\Framing\Method\TxCommit;
41
use ButterAMQP\AMQP091\Framing\Method\TxCommitOk;
42
use ButterAMQP\AMQP091\Framing\Method\TxRollback;
43
use ButterAMQP\AMQP091\Framing\Method\TxRollbackOk;
44
use ButterAMQP\AMQP091\Framing\Method\TxSelect;
45
use ButterAMQP\AMQP091\Framing\Method\TxSelectOk;
46
use ButterAMQP\Message;
47
use ButterAMQP\Returned;
48
49
class Channel implements ChannelInterface, WireSubscriberInterface
50
{
51
    const STATUS_CLOSED = 0;
52
    const STATUS_READY = 1;
53
    const STATUS_INACTIVE = 2;
54
55
    const MODE_NORMAL = 0;
56
    const MODE_CONFIRM = 1;
57
    const MODE_TX = 2;
58
59
    /**
60
     * @var int
61
     */
62
    private $id;
63
64
    /**
65
     * @var WireInterface
66
     */
67
    private $wire;
68
69
    /**
70
     * @var int
71
     */
72
    private $status = self::STATUS_CLOSED;
73
74
    /**
75
     * @var int
76
     */
77
    private $mode = self::MODE_NORMAL;
78
79
    /**
80
     * @var callable[]
81
     */
82
    private $consumers = [];
83
84
    /**
85
     * @var callable
86
     */
87
    private $returnCallable;
88
89
    /**
90
     * @var callable
91
     */
92
    private $confirmCallable;
93
94
    /**
95
     * @param WireInterface $wire
96
     * @param int           $id
97
     */
98 56
    public function __construct(WireInterface $wire, $id)
99
    {
100 56
        $this->id = $id;
101 56
        $this->wire = $wire;
102 56
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107 19
    public function open()
108
    {
109 19
        if ($this->status != self::STATUS_CLOSED) {
110 1
            return $this;
111
        }
112
113 19
        $this->wire->subscribe($this->id, $this);
114
115 19
        $this->send(new ChannelOpen($this->id, ''))
116 19
            ->wait(ChannelOpenOk::class);
117
118 19
        $this->status = self::STATUS_READY;
119 19
        $this->mode = self::MODE_NORMAL;
120
121 19
        return $this;
122
    }
123
124
    /**
125
     * {@inheritdoc}
126
     */
127 1
    public function flow($active)
128
    {
129
        /** @var ChannelFlowOk $frame */
130 1
        $frame = $this->send(new ChannelFlow($this->id, $active))
131 1
            ->wait(ChannelFlowOk::class);
132
133 1
        $this->status = $frame->isActive() ? self::STATUS_READY :
134 1
            self::STATUS_INACTIVE;
135
136 1
        return $this;
137
    }
138
139
    /**
140
     * {@inheritdoc}
141
     */
142 1
    public function serve($blocking = true)
143
    {
144 1
        $this->wire->next($blocking);
145
146 1
        return $this;
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152 1
    public function close()
153
    {
154 1
        $this->send(new ChannelClose($this->id, 0, '', 0, 0))
155 1
            ->wait(ChannelCloseOk::class);
156
157 1
        $this->status = self::STATUS_CLOSED;
158
159 1
        return $this;
160
    }
161
162
    /**
163
     * {@inheritdoc}
164
     */
165 1
    public function qos($prefetchSize, $prefetchCount, $globally = false)
166
    {
167 1
        $this->send(new BasicQos($this->id, $prefetchSize, $prefetchCount, $globally))
168 1
            ->wait(BasicQosOk::class);
169
170 1
        return $this;
171
    }
172
173
    /**
174
     * {@inheritdoc}
175
     */
176 4
    public function exchange($name)
177
    {
178 4
        return new Exchange($this->wire, $this->id, $name);
179
    }
180
181
    /**
182
     * {@inheritdoc}
183
     */
184 14
    public function queue($name = '')
185
    {
186 14
        return new Queue($this->wire, $this->id, $name);
187
    }
188
189
    /**
190
     * {@inheritdoc}
191
     */
192 11
    public function consume($queue, callable $callback, $flags = 0, $tag = '', array $arguments = [])
193
    {
194 11
        if (empty($tag) && $flags & Consumer::FLAG_NO_WAIT) {
195 1
            $tag = uniqid('php-consumer-');
196 1
        }
197
198 11
        $this->send(new BasicConsume(
199 11
            $this->id,
200 11
            0,
201 11
            $queue,
202 11
            $tag,
203 11
            $flags & Consumer::FLAG_NO_LOCAL,
204 11
            $flags & Consumer::FLAG_NO_ACK,
205 11
            $flags & Consumer::FLAG_EXCLUSIVE,
206 11
            $flags & Consumer::FLAG_NO_WAIT,
207
            $arguments
208 11
        ));
209
210 11
        if (!($flags & Consumer::FLAG_NO_WAIT)) {
211 7
            $tag = $this->wait(BasicConsumeOk::class)
212 7
                ->getConsumerTag();
213 7
        }
214
215 11
        $this->consumers[$tag] = $callback;
216
217 11
        return new Consumer($this, $tag);
218
    }
219
220
    /**
221
     * {@inheritdoc}
222
     */
223 7
    public function get($queue, $withAck = true)
224
    {
225
        /** @var BasicGetOk|BasicGetEmpty $frame */
226 7
        $frame = $this->send(new BasicGet($this->id, 0, $queue, !$withAck))
227 7
            ->wait([BasicGetOk::class, BasicGetEmpty::class]);
228
229 7
        if ($frame instanceof BasicGetEmpty) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicGetEmpty does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
230 2
            return null;
231
        }
232
233
        /** @var Header $header */
234 5
        $header = $this->wait(Header::class);
235 5
        $content = '';
236
237 5
        while ($header->getSize() > strlen($content)) {
238 3
            $content .= $this->wait(Content::class)->getData();
239 3
        }
240
241 5
        return new Delivery(
242 5
            $this,
243 5
            '',
244 5
            $frame->getDeliveryTag(),
245 5
            $frame->isRedelivered(),
246 5
            $frame->getExchange(),
247 5
            $frame->getRoutingKey(),
248 5
            $content,
249 5
            $header->getProperties()
250 5
        );
251
    }
252
253
    /**
254
     * {@inheritdoc}
255
     */
256 2
    public function recover($requeue = true)
257
    {
258 2
        $this->send(new BasicRecover($this->id, $requeue))
259 2
            ->wait(BasicRecoverOk::class);
260
261 2
        return $this;
262
    }
263
264
    /**
265
     * {@inheritdoc}
266
     */
267 6
    public function cancel($tag, $flags = 0)
268
    {
269 6
        $this->send(new BasicCancel($this->id, $tag, $flags & Consumer::FLAG_NO_WAIT));
270
271 6
        unset($this->consumers[$tag]);
272
273 6
        if ($flags & Consumer::FLAG_NO_WAIT) {
274 1
            return $this;
275
        }
276
277 5
        $this->wait(BasicCancelOk::class);
278
279 5
        return $this;
280
    }
281
282
    /**
283
     * {@inheritdoc}
284
     */
285 15
    public function publish(Message $message, $exchange = '', $routingKey = '', $flags = 0)
286
    {
287 15
        $this->send(new BasicPublish(
288 15
            $this->id,
289 15
            0,
290 15
            $exchange,
291 15
            $routingKey,
292 15
            (bool) ($flags & Message::FLAG_MANDATORY),
293 15
            (bool) ($flags & Message::FLAG_IMMEDIATE)
294 15
        ));
295
296 15
        $body = $message->getBody();
297
298 15
        $this->send(new Header($this->id, 60, 0, strlen($body), $message->getProperties()));
299 15
        $this->send(new Content($this->id, $body));
300
301 15
        return $this;
302
    }
303
304
    /**
305
     * {@inheritdoc}
306
     */
307 6
    public function ack($deliveryTag, $multiple = false)
308
    {
309 6
        $this->send(new BasicAck($this->id, $deliveryTag, $multiple));
310
311 6
        return $this;
312
    }
313
314
    /**
315
     * {@inheritdoc}
316
     */
317 4
    public function reject($deliveryTag, $requeue = true, $multiple = false)
318
    {
319 4
        $multiple ? $this->send(new BasicNack($this->id, $deliveryTag, $multiple, $requeue)) :
320 3
            $this->send(new BasicReject($this->id, $deliveryTag, $requeue));
321
322 4
        return $this;
323
    }
324
325
    /**
326
     * {@inheritdoc}
327
     */
328 2
    public function onReturn(callable $callable)
329
    {
330 2
        $this->returnCallable = $callable;
331
332 2
        return $this;
333
    }
334
335
    /**
336
     * {@inheritdoc}
337
     */
338 5
    public function selectConfirm(callable $callable, $noWait = false)
339
    {
340 5
        $this->confirmCallable = $callable;
341
342 5
        $this->send(new ConfirmSelect($this->id, $noWait));
343
344 5
        if (!$noWait) {
345 4
            $this->wait(ConfirmSelectOk::class);
346 4
        }
347
348 5
        $this->mode = self::MODE_CONFIRM;
349
350 5
        return $this;
351
    }
352
353
    /**
354
     * {@inheritdoc}
355
     */
356 7
    public function selectTx()
357
    {
358 7
        $this->send(new TxSelect($this->id))
359 7
            ->wait(TxSelectOk::class);
360
361 7
        $this->mode = self::MODE_TX;
362
363 7
        return $this;
364
    }
365
366
    /**
367
     * {@inheritdoc}
368
     */
369 6
    public function txCommit()
370
    {
371 6
        if ($this->mode != self::MODE_TX) {
372 1
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
373
        }
374
375 5
        $this->send(new TxCommit($this->id))
376 5
            ->wait(TxCommitOk::class);
377
378 5
        return;
379
    }
380
381
    /**
382
     * {@inheritdoc}
383
     */
384 4
    public function txRollback()
385
    {
386 4
        if ($this->mode != self::MODE_TX) {
387 1
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
388
        }
389
390 3
        $this->send(new TxRollback($this->id))
391 3
            ->wait(TxRollbackOk::class);
392
393 3
        return;
394
    }
395
396
    /**
397
     * {@inheritdoc}
398
     */
399 10
    public function hasConsumer($tag)
400
    {
401 10
        return isset($this->consumers[(string) $tag]);
402
    }
403
404
    /**
405
     * {@inheritdoc}
406
     */
407 3
    public function getConsumerTags()
408
    {
409 3
        return array_keys($this->consumers);
410
    }
411
412
    /**
413
     * @return string
414
     */
415 3
    public function getStatus()
416
    {
417 3
        return $this->status;
418
    }
419
420
    /**
421
     * Sends frame to the server.
422
     *
423
     * @param Frame $frame
424
     *
425
     * @return $this
426
     */
427 46
    private function send(Frame $frame)
428
    {
429 46
        $this->wire->send($frame);
430
431 46
        return $this;
432
    }
433
434
    /**
435
     * @param string|array $type
436
     *
437
     * @return Frame
438
     */
439 37
    private function wait($type)
440
    {
441 37
        return $this->wire->wait($this->id, $type);
442
    }
443
444
    /**
445
     * @param Frame $frame
446
     */
447 29
    public function dispatch(Frame $frame)
448
    {
449 29
        if ($frame instanceof ChannelClose) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ChannelClose does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
450 3
            $this->onChannelClose($frame);
451 28
        } elseif ($frame instanceof ChannelFlow) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ChannelFlow does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
452 1
            $this->onChannelFlow($frame);
453 28
        } elseif ($frame instanceof BasicDeliver) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicDeliver does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
454 6
            $this->onBasicDeliver($frame);
455 26
        } elseif ($frame instanceof BasicReturn) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicReturn does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
456 4
            $this->onBasicReturn($frame);
457 24
        } elseif ($frame instanceof BasicAck) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicAck does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
458 3
            $this->onBasicAck($frame);
459 22
        } elseif ($frame instanceof BasicNack) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicNack does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
460 2
            $this->onBasicNack($frame);
461 20
        } elseif ($frame instanceof BasicCancel) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicCancel does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
462 3
            $this->onBasicCancel($frame);
463 3
        }
464 24
    }
465
466
    /**
467
     * @param BasicDeliver $frame
468
     *
469
     * @throws \Exception
470
     */
471 6
    private function onBasicDeliver(BasicDeliver $frame)
472
    {
473
        /** @var Header $header */
474 6
        $header = $this->wait(Header::class);
475 6
        $content = '';
476
477 6
        while ($header->getSize() > strlen($content)) {
478 6
            $content .= $this->wait(Content::class)->getData();
479 6
        }
480
481 6
        if (!isset($this->consumers[$frame->getConsumerTag()])) {
482 1
            throw new UnknownConsumerTagException(sprintf(
483 1
                'Consumer with tag "%s" does not exist',
484 1
                $frame->getConsumerTag()
485 1
            ));
486
        }
487
488 5
        $delivery = new Delivery(
489 5
            $this,
490 5
            $frame->getConsumerTag(),
491 5
            $frame->getDeliveryTag(),
492 5
            $frame->isRedelivered(),
493 5
            $frame->getExchange(),
494 5
            $frame->getRoutingKey(),
495 5
            $content,
496 5
            $header->getProperties()
497 5
        );
498
499 5
        call_user_func($this->consumers[$frame->getConsumerTag()], $delivery);
500 5
    }
501
502
    /**
503
     * @param BasicReturn $frame
504
     *
505
     * @throws \Exception
506
     */
507 4
    private function onBasicReturn(BasicReturn $frame)
508
    {
509
        /** @var Header $header */
510 4
        $header = $this->wait(Header::class);
511 4
        $content = '';
512
513 4
        while ($header->getSize() > strlen($content)) {
514 3
            $content .= $this->wait(Content::class)->getData();
515 3
        }
516
517 4
        if (!$this->returnCallable) {
518 2
            throw new NoReturnException(
519
                'A message was returned but there is no return handler. '.
520 2
                'Make sure you setup a handler for returned messages using Channel::onReturn method, '.
521
                ', or remove MANDATORY and IMMEDIATE flags when publishing messages.'
522 2
            );
523
        }
524
525 2
        $returned = new Returned(
526 2
            $frame->getReplyCode(),
527 2
            $frame->getReplyText(),
528 2
            $frame->getExchange(),
529 2
            $frame->getRoutingKey(),
530 2
            $content,
531 2
            $header->getProperties()
532 2
        );
533
534 2
        call_user_func($this->returnCallable, $returned);
535 2
    }
536
537
    /**
538
     * @param BasicAck $frame
539
     */
540 3 View Code Duplication
    private function onBasicAck(BasicAck $frame)
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
541
    {
542 3
        if (!$this->confirmCallable) {
543 1
            throw new \RuntimeException(
544
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
545 1
            );
546
        }
547
548 2
        call_user_func($this->confirmCallable, new Confirm(true, $frame->getDeliveryTag(), $frame->isMultiple()));
549 2
    }
550
551
    /**
552
     * @param BasicNack $frame
553
     */
554 2 View Code Duplication
    private function onBasicNack(BasicNack $frame)
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
555
    {
556 2
        if (!$this->confirmCallable) {
557 1
            throw new \RuntimeException(
558
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
559 1
            );
560
        }
561
562 1
        call_user_func($this->confirmCallable, new Confirm(false, $frame->getDeliveryTag(), $frame->isMultiple()));
563 1
    }
564
565
    /**
566
     * @param BasicCancel $frame
567
     */
568 3
    private function onBasicCancel(BasicCancel $frame)
569
    {
570 3
        unset($this->consumers[$frame->getConsumerTag()]);
571
572 3
        if (!$frame->isNoWait()) {
573 1
            $this->send(new BasicCancelOk($this->id, $frame->getConsumerTag()));
574 1
        }
575 3
    }
576
577
    /**
578
     * @param ChannelFlow $frame
579
     */
580 1
    private function onChannelFlow(ChannelFlow $frame)
581
    {
582 1
        $this->send(new ChannelFlowOk($this->id, $frame->isActive()));
583
584 1
        $this->status = $frame->isActive() ? self::STATUS_READY : self::STATUS_INACTIVE;
585 1
    }
586
587
    /**
588
     * @param ChannelClose $frame
589
     *
590
     * @throws AMQPException
591
     */
592 3
    private function onChannelClose(ChannelClose $frame)
593
    {
594 3
        $this->send(new ChannelCloseOk($this->id));
595
596 3
        $this->status = self::STATUS_CLOSED;
597
598 3
        throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
599
    }
600
}
601