Completed
Push — master ( e17409...9d6c1e )
by Sergey
04:06
created

Channel::publish()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 18
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 18
ccs 13
cts 13
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 12
nc 1
nop 4
crap 1
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ChannelInterface;
6
use ButterAMQP\Confirm;
7
use ButterAMQP\Delivery;
8
use ButterAMQP\Exception\AMQPException;
9
use ButterAMQP\Exception\NoReturnException;
10
use ButterAMQP\Exception\TransactionNotSelectedException;
11
use ButterAMQP\Exception\UnknownConsumerTagException;
12
use ButterAMQP\AMQP091\Framing\Content;
13
use ButterAMQP\AMQP091\Framing\Frame;
14
use ButterAMQP\AMQP091\Framing\Header;
15
use ButterAMQP\AMQP091\Framing\Method\BasicAck;
16
use ButterAMQP\AMQP091\Framing\Method\BasicCancel;
17
use ButterAMQP\AMQP091\Framing\Method\BasicCancelOk;
18
use ButterAMQP\AMQP091\Framing\Method\BasicConsume;
19
use ButterAMQP\AMQP091\Framing\Method\BasicConsumeOk;
20
use ButterAMQP\AMQP091\Framing\Method\BasicDeliver;
21
use ButterAMQP\AMQP091\Framing\Method\BasicGet;
22
use ButterAMQP\AMQP091\Framing\Method\BasicGetEmpty;
23
use ButterAMQP\AMQP091\Framing\Method\BasicGetOk;
24
use ButterAMQP\AMQP091\Framing\Method\BasicNack;
25
use ButterAMQP\AMQP091\Framing\Method\BasicPublish;
26
use ButterAMQP\AMQP091\Framing\Method\BasicQos;
27
use ButterAMQP\AMQP091\Framing\Method\BasicQosOk;
28
use ButterAMQP\AMQP091\Framing\Method\BasicRecover;
29
use ButterAMQP\AMQP091\Framing\Method\BasicRecoverOk;
30
use ButterAMQP\AMQP091\Framing\Method\BasicReject;
31
use ButterAMQP\AMQP091\Framing\Method\BasicReturn;
32
use ButterAMQP\AMQP091\Framing\Method\ChannelClose;
33
use ButterAMQP\AMQP091\Framing\Method\ChannelCloseOk;
34
use ButterAMQP\AMQP091\Framing\Method\ChannelFlow;
35
use ButterAMQP\AMQP091\Framing\Method\ChannelFlowOk;
36
use ButterAMQP\AMQP091\Framing\Method\ChannelOpen;
37
use ButterAMQP\AMQP091\Framing\Method\ChannelOpenOk;
38
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelect;
39
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelectOk;
40
use ButterAMQP\AMQP091\Framing\Method\TxCommit;
41
use ButterAMQP\AMQP091\Framing\Method\TxCommitOk;
42
use ButterAMQP\AMQP091\Framing\Method\TxRollback;
43
use ButterAMQP\AMQP091\Framing\Method\TxRollbackOk;
44
use ButterAMQP\AMQP091\Framing\Method\TxSelect;
45
use ButterAMQP\AMQP091\Framing\Method\TxSelectOk;
46
use ButterAMQP\Message;
47
use ButterAMQP\Returned;
48
use ButterAMQP\WireInterface;
49
use ButterAMQP\WireSubscriberInterface;
50
use Psr\Log\LoggerAwareInterface;
51
use Psr\Log\LoggerAwareTrait;
52
use Psr\Log\NullLogger;
53
54
class Channel implements ChannelInterface, WireSubscriberInterface, LoggerAwareInterface
55
{
56
    use LoggerAwareTrait;
57
58
    const STATUS_CLOSED = 0;
59
    const STATUS_READY = 1;
60
    const STATUS_INACTIVE = 2;
61
62
    const MODE_NORMAL = 0;
63
    const MODE_CONFIRM = 1;
64
    const MODE_TX = 2;
65
66
    /**
67
     * @var int
68
     */
69
    private $id;
70
71
    /**
72
     * @var WireInterface
73
     */
74
    private $wire;
75
76
    /**
77
     * @var int
78
     */
79
    private $status = self::STATUS_CLOSED;
80
81
    /**
82
     * @var int
83
     */
84
    private $mode = self::MODE_NORMAL;
85
86
    /**
87
     * @var callable[]
88
     */
89
    private $consumers = [];
90
91
    /**
92
     * @var callable
93
     */
94
    private $returnCallable;
95
96
    /**
97
     * @var callable
98
     */
99
    private $confirmCallable;
100
101
    /**
102
     * @param WireInterface $wire
103
     * @param int           $id
104
     */
105 53
    public function __construct(WireInterface $wire, $id)
106
    {
107 53
        $this->id = $id;
108 53
        $this->wire = $wire;
109 53
        $this->logger = new NullLogger();
110 53
    }
111
112
    /**
113
     * {@inheritdoc}
114
     */
115 19
    public function open()
116
    {
117 19
        if ($this->status != self::STATUS_CLOSED) {
118 1
            return $this;
119
        }
120
121 19
        $this->wire->subscribe($this->id, $this);
122
123 19
        $this->send(new ChannelOpen($this->id, ''))
124 19
            ->wait(ChannelOpenOk::class);
125
126 19
        $this->status = self::STATUS_READY;
127 19
        $this->mode = self::MODE_NORMAL;
128
129 19
        return $this;
130
    }
131
132
    /**
133
     * {@inheritdoc}
134
     */
135 1
    public function flow($active)
136
    {
137
        /** @var ChannelFlowOk $frame */
138 1
        $frame = $this->send(new ChannelFlow($this->id, $active))
139 1
            ->wait(ChannelFlowOk::class);
140
141 1
        $this->status = $frame->isActive() ? self::STATUS_READY :
142 1
            self::STATUS_INACTIVE;
143
144 1
        return $this;
145
    }
146
147
    /**
148
     * {@inheritdoc}
149
     */
150 1
    public function close()
151
    {
152 1
        $this->send(new ChannelClose($this->id, 0, '', 0, 0))
153 1
            ->wait(ChannelCloseOk::class);
154
155 1
        $this->status = self::STATUS_CLOSED;
156
157 1
        return $this;
158 1
    }
159
160
    /**
161
     * {@inheritdoc}
162
     */
163 1
    public function qos($prefetchSize, $prefetchCount, $globally = false)
164
    {
165 1
        $this->send(new BasicQos($this->id, $prefetchSize, $prefetchCount, $globally))
166 1
            ->wait(BasicQosOk::class);
167
168 1
        return $this;
169
    }
170
171
    /**
172
     * {@inheritdoc}
173
     */
174 4
    public function exchange($name)
175
    {
176 4
        return new Exchange($this->wire, $this->id, $name);
177
    }
178
179
    /**
180
     * {@inheritdoc}
181
     */
182 14
    public function queue($name = '')
183
    {
184 14
        return new Queue($this->wire, $this->id, $name);
185
    }
186
187
    /**
188
     * {@inheritdoc}
189
     */
190 11
    public function consume($queue, callable $callback, $flags = 0, $tag = '', array $arguments = [])
191
    {
192 11
        if (empty($tag) && $flags & Consumer::FLAG_NO_WAIT) {
193 1
            $tag = uniqid('php-consumer-');
194 1
        }
195
196 11
        $this->send(new BasicConsume(
197 11
            $this->id,
198 11
            0,
199 11
            $queue,
200 11
            $tag,
201 11
            $flags & Consumer::FLAG_NO_LOCAL,
202 11
            $flags & Consumer::FLAG_NO_ACK,
203 11
            $flags & Consumer::FLAG_EXCLUSIVE,
204 11
            $flags & Consumer::FLAG_NO_WAIT,
205
            $arguments
206 11
        ));
207
208 11
        if (!($flags & Consumer::FLAG_NO_WAIT)) {
209 7
            $tag = $this->wait(BasicConsumeOk::class)
210 7
                ->getConsumerTag();
211 7
        }
212
213 11
        $this->consumers[$tag] = $callback;
214
215 11
        return new Consumer($this, $tag);
216
    }
217
218
    /**
219
     * {@inheritdoc}
220
     */
221 7
    public function get($queue, $withAck = true)
222
    {
223
        /** @var BasicGetOk|BasicGetEmpty $frame */
224 7
        $frame = $this->send(new BasicGet($this->id, 0, $queue, !$withAck))
225 7
            ->wait([BasicGetOk::class, BasicGetEmpty::class]);
226
227 7
        if ($frame instanceof BasicGetEmpty) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicGetEmpty does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
228 2
            return null;
229
        }
230
231
        /** @var Header $header */
232 5
        $header = $this->wait(Header::class);
233 5
        $content = '';
234
235 5
        while ($header->getSize() > strlen($content)) {
236 3
            $content .= $this->wait(Content::class)->getData();
237 3
        }
238
239 5
        return new Delivery(
240 5
            $this,
241 5
            '',
242 5
            $frame->getDeliveryTag(),
243 5
            $frame->isRedelivered(),
244 5
            $frame->getExchange(),
245 5
            $frame->getRoutingKey(),
246 5
            $content,
247 5
            $header->getProperties()
248 5
        );
249
    }
250
251
    /**
252
     * {@inheritdoc}
253
     */
254 2
    public function recover($requeue = true)
255
    {
256 2
        $this->send(new BasicRecover($this->id, $requeue))
257 2
            ->wait(BasicRecoverOk::class);
258
259 2
        return $this;
260
    }
261
262
    /**
263
     * {@inheritdoc}
264
     */
265 6
    public function cancel($tag, $flags = 0)
266
    {
267 6
        $this->send(new BasicCancel($this->id, $tag, $flags & Consumer::FLAG_NO_WAIT));
268
269 6
        unset($this->consumers[$tag]);
270
271 6
        if ($flags & Consumer::FLAG_NO_WAIT) {
272 1
            return $this;
273
        }
274
275 5
        $this->wait(BasicCancelOk::class);
276
277 5
        return $this;
278
    }
279
280
    /**
281
     * {@inheritdoc}
282
     */
283 15
    public function publish(Message $message, $exchange = '', $routingKey = '', $flags = 0)
284
    {
285 15
        $this->send(new BasicPublish(
286 15
            $this->id,
287 15
            0,
288 15
            $exchange,
289 15
            $routingKey,
290 15
            (bool) ($flags & Message::FLAG_MANDATORY),
291 15
            (bool) ($flags & Message::FLAG_IMMEDIATE)
292 15
        ));
293
294 15
        $body = $message->getBody();
295
296 15
        $this->send(new Header($this->id, 60, 0, strlen($body), $message->getProperties()));
297 15
        $this->send(new Content($this->id, $body));
298
299 15
        return $this;
300
    }
301
302
    /**
303
     * {@inheritdoc}
304
     */
305 6
    public function ack($deliveryTag, $multiple = false)
306
    {
307 6
        $this->send(new BasicAck($this->id, $deliveryTag, $multiple));
308
309 6
        return $this;
310
    }
311
312
    /**
313
     * {@inheritdoc}
314
     */
315 4
    public function reject($deliveryTag, $requeue = true, $multiple = false)
316
    {
317 4
        $multiple ? $this->send(new BasicNack($this->id, $deliveryTag, $multiple, $requeue)) :
318 3
            $this->send(new BasicReject($this->id, $deliveryTag, $requeue));
319
320 4
        return $this;
321
    }
322
323
    /**
324
     * {@inheritdoc}
325
     */
326 2
    public function onReturn(callable $callable)
327
    {
328 2
        $this->returnCallable = $callable;
329
330 2
        return $this;
331
    }
332
333
    /**
334
     * {@inheritdoc}
335
     */
336 5
    public function selectConfirm(callable $callable, $noWait = false)
337
    {
338 5
        $this->confirmCallable = $callable;
339
340 5
        $this->send(new ConfirmSelect($this->id, $noWait));
341
342 5
        if (!$noWait) {
343 4
            $this->wait(ConfirmSelectOk::class);
344 4
        }
345
346 5
        $this->mode = self::MODE_CONFIRM;
347
348 5
        return $this;
349
    }
350
351
    /**
352
     * {@inheritdoc}
353
     */
354 7
    public function selectTx()
355
    {
356 7
        $this->send(new TxSelect($this->id))
357 7
            ->wait(TxSelectOk::class);
358
359 7
        $this->mode = self::MODE_TX;
360
361 7
        return $this;
362
    }
363
364
    /**
365
     * {@inheritdoc}
366
     */
367 5
    public function txCommit()
368
    {
369 5
        if ($this->mode != self::MODE_TX) {
370
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
371
        }
372
373 5
        $this->send(new TxCommit($this->id))
374 5
            ->wait(TxCommitOk::class);
375
376 5
        return;
377
    }
378
379
    /**
380
     * {@inheritdoc}
381
     */
382 3
    public function txRollback()
383
    {
384 3
        if ($this->mode != self::MODE_TX) {
385
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
386
        }
387
388 3
        $this->send(new TxRollback($this->id))
389 3
            ->wait(TxRollbackOk::class);
390
391 3
        return;
392
    }
393
394
    /**
395
     * {@inheritdoc}
396
     */
397 10
    public function hasConsumer($tag)
398
    {
399 10
        return isset($this->consumers[(string) $tag]);
400
    }
401
402
    /**
403
     * {@inheritdoc}
404
     */
405 3
    public function getConsumerTags()
406
    {
407 3
        return array_keys($this->consumers);
408
    }
409
410
    /**
411
     * @return string
412
     */
413 3
    public function getStatus()
414
    {
415 3
        return $this->status;
416
    }
417
418
    /**
419
     * @return int
420
     */
421
    public function getMode()
422
    {
423
        return $this->mode;
424
    }
425
426
    /**
427
     * Sends frame to the server.
428
     *
429
     * @param Frame $frame
430
     *
431
     * @return $this
432
     */
433 46
    private function send(Frame $frame)
434
    {
435 46
        $this->wire->send($frame);
436
437 46
        return $this;
438
    }
439
440
    /**
441
     * @param string|array $type
442
     *
443
     * @return Frame
444
     */
445 37
    private function wait($type)
446
    {
447 37
        return $this->wire->wait($this->id, $type);
448
    }
449
450
    /**
451
     * @param Frame $frame
452
     */
453 29
    public function dispatch(Frame $frame)
454
    {
455 29
        if ($frame instanceof ChannelClose) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ChannelClose does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
456 3
            $this->onChannelClose($frame);
457 28
        } elseif ($frame instanceof ChannelFlow) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ChannelFlow does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
458 1
            $this->onChannelFlow($frame);
459 28
        } elseif ($frame instanceof BasicDeliver) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicDeliver does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
460 6
            $this->onBasicDeliver($frame);
461 26
        } elseif ($frame instanceof BasicReturn) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicReturn does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
462 4
            $this->onBasicReturn($frame);
463 24
        } elseif ($frame instanceof BasicAck) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicAck does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
464 3
            $this->onBasicAck($frame);
465 22
        } elseif ($frame instanceof BasicNack) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicNack does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
466 2
            $this->onBasicNack($frame);
467 20
        } elseif ($frame instanceof BasicCancel) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\BasicCancel does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
468 3
            $this->onBasicCancel($frame);
469 3
        }
470 24
    }
471
472
    /**
473
     * @param BasicDeliver $frame
474
     *
475
     * @throws \Exception
476
     */
477 6
    private function onBasicDeliver(BasicDeliver $frame)
478
    {
479
        /** @var Header $header */
480 6
        $header = $this->wait(Header::class);
481 6
        $content = '';
482
483 6
        while ($header->getSize() > strlen($content)) {
484 6
            $content .= $this->wait(Content::class)->getData();
485 6
        }
486
487 6
        if (!isset($this->consumers[$frame->getConsumerTag()])) {
488 1
            throw new UnknownConsumerTagException(sprintf(
489 1
                'Consumer with tag "%s" does not exist',
490 1
                $frame->getConsumerTag()
491 1
            ));
492
        }
493
494 5
        $delivery = new Delivery(
495 5
            $this,
496 5
            $frame->getConsumerTag(),
497 5
            $frame->getDeliveryTag(),
498 5
            $frame->isRedelivered(),
499 5
            $frame->getExchange(),
500 5
            $frame->getRoutingKey(),
501 5
            $content,
502 5
            $header->getProperties()
503 5
        );
504
505 5
        call_user_func($this->consumers[$frame->getConsumerTag()], $delivery);
506 5
    }
507
508
    /**
509
     * @param BasicReturn $frame
510
     *
511
     * @throws \Exception
512
     */
513 4
    private function onBasicReturn(BasicReturn $frame)
514
    {
515
        /** @var Header $header */
516 4
        $header = $this->wait(Header::class);
517 4
        $content = '';
518
519 4
        while ($header->getSize() > strlen($content)) {
520 3
            $content .= $this->wait(Content::class)->getData();
521 3
        }
522
523 4
        if (!$this->returnCallable) {
524 2
            throw new NoReturnException(
525
                'A message was returned but there is no return handler. '.
526 2
                'Make sure you setup a handler for returned messages using Channel::onReturn method, '.
527
                ', or remove MANDATORY and IMMEDIATE flags when publishing messages.'
528 2
            );
529
        }
530
531 2
        $returned = new Returned(
532 2
            $frame->getReplyCode(),
533 2
            $frame->getReplyText(),
534 2
            $frame->getExchange(),
535 2
            $frame->getRoutingKey(),
536 2
            $content,
537 2
            $header->getProperties()
538 2
        );
539
540 2
        call_user_func($this->returnCallable, $returned);
541 2
    }
542
543
    /**
544
     * @param BasicAck $frame
545
     */
546 3 View Code Duplication
    private function onBasicAck(BasicAck $frame)
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
547
    {
548 3
        if (!$this->confirmCallable) {
549 1
            throw new \RuntimeException(
550
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
551 1
            );
552
        }
553
554 2
        call_user_func($this->confirmCallable, new Confirm(true, $frame->getDeliveryTag(), $frame->isMultiple()));
555 2
    }
556
557
    /**
558
     * @param BasicNack $frame
559
     */
560 2 View Code Duplication
    private function onBasicNack(BasicNack $frame)
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
561
    {
562 2
        if (!$this->confirmCallable) {
563 1
            throw new \RuntimeException(
564
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
565 1
            );
566
        }
567
568 1
        call_user_func($this->confirmCallable, new Confirm(false, $frame->getDeliveryTag(), $frame->isMultiple()));
569 1
    }
570
571
    /**
572
     * @param BasicCancel $frame
573
     */
574 3
    private function onBasicCancel(BasicCancel $frame)
575
    {
576 3
        unset($this->consumers[$frame->getConsumerTag()]);
577
578 3
        if (!$frame->isNoWait()) {
579 1
            $this->send(new BasicCancelOk($this->id, $frame->getConsumerTag()));
580 1
        }
581 3
    }
582
583
    /**
584
     * @param ChannelFlow $frame
585
     */
586 1
    private function onChannelFlow(ChannelFlow $frame)
587
    {
588 1
        $this->send(new ChannelFlowOk($this->id, $frame->isActive()));
589
590 1
        $this->status = $frame->isActive() ? self::STATUS_READY : self::STATUS_INACTIVE;
591 1
    }
592
593
    /**
594
     * @param ChannelClose $frame
595
     *
596
     * @throws AMQPException
597
     */
598 3
    private function onChannelClose(ChannelClose $frame)
599
    {
600 3
        $this->send(new ChannelCloseOk($this->id));
601
602 3
        $this->status = self::STATUS_CLOSED;
603
604 3
        throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
605
    }
606
}
607