Completed
Push — master ( fdb7de...e17409 )
by Sergey
03:32
created

Channel::open()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 16
ccs 9
cts 9
cp 1
rs 9.4285
cc 2
eloc 9
nc 2
nop 0
crap 2
1
<?php
2
3
namespace ButterAMQP;
4
5
use ButterAMQP\Exception\AMQPException;
6
use ButterAMQP\Exception\NoReturnException;
7
use ButterAMQP\Exception\TransactionNotSelectedException;
8
use ButterAMQP\Exception\UnknownConsumerTagException;
9
use ButterAMQP\Framing\Content;
10
use ButterAMQP\Framing\Frame;
11
use ButterAMQP\Framing\Header;
12
use ButterAMQP\Framing\Method\BasicAck;
13
use ButterAMQP\Framing\Method\BasicCancel;
14
use ButterAMQP\Framing\Method\BasicCancelOk;
15
use ButterAMQP\Framing\Method\BasicConsume;
16
use ButterAMQP\Framing\Method\BasicConsumeOk;
17
use ButterAMQP\Framing\Method\BasicDeliver;
18
use ButterAMQP\Framing\Method\BasicGet;
19
use ButterAMQP\Framing\Method\BasicGetEmpty;
20
use ButterAMQP\Framing\Method\BasicGetOk;
21
use ButterAMQP\Framing\Method\BasicNack;
22
use ButterAMQP\Framing\Method\BasicPublish;
23
use ButterAMQP\Framing\Method\BasicQos;
24
use ButterAMQP\Framing\Method\BasicQosOk;
25
use ButterAMQP\Framing\Method\BasicRecover;
26
use ButterAMQP\Framing\Method\BasicRecoverOk;
27
use ButterAMQP\Framing\Method\BasicReject;
28
use ButterAMQP\Framing\Method\BasicReturn;
29
use ButterAMQP\Framing\Method\ChannelClose;
30
use ButterAMQP\Framing\Method\ChannelCloseOk;
31
use ButterAMQP\Framing\Method\ChannelFlow;
32
use ButterAMQP\Framing\Method\ChannelFlowOk;
33
use ButterAMQP\Framing\Method\ChannelOpen;
34
use ButterAMQP\Framing\Method\ChannelOpenOk;
35
use ButterAMQP\Framing\Method\ConfirmSelect;
36
use ButterAMQP\Framing\Method\ConfirmSelectOk;
37
use ButterAMQP\Framing\Method\TxCommit;
38
use ButterAMQP\Framing\Method\TxCommitOk;
39
use ButterAMQP\Framing\Method\TxRollback;
40
use ButterAMQP\Framing\Method\TxRollbackOk;
41
use ButterAMQP\Framing\Method\TxSelect;
42
use ButterAMQP\Framing\Method\TxSelectOk;
43
use Psr\Log\LoggerAwareInterface;
44
use Psr\Log\LoggerAwareTrait;
45
use Psr\Log\NullLogger;
46
47
class Channel implements ChannelInterface, WireSubscriberInterface, LoggerAwareInterface
48
{
49
    use LoggerAwareTrait;
50
51
    const STATUS_CLOSED = 0;
52
    const STATUS_READY = 1;
53
    const STATUS_INACTIVE = 2;
54
55
    const MODE_NORMAL = 0;
56
    const MODE_CONFIRM = 1;
57
    const MODE_TX = 2;
58
59
    /**
60
     * @var int
61
     */
62
    private $id;
63
64
    /**
65
     * @var WireInterface
66
     */
67
    private $wire;
68
69
    /**
70
     * @var int
71
     */
72
    private $status = self::STATUS_CLOSED;
73
74
    /**
75
     * @var int
76
     */
77
    private $mode = self::MODE_NORMAL;
78
79
    /**
80
     * @var callable[]
81
     */
82
    private $consumers = [];
83
84
    /**
85
     * @var callable
86
     */
87
    private $returnCallable;
88
89
    /**
90
     * @var callable
91
     */
92
    private $confirmCallable;
93
94
    /**
95
     * @param WireInterface $wire
96
     * @param int           $id
97
     */
98 53
    public function __construct(WireInterface $wire, $id)
99
    {
100 53
        $this->id = $id;
101 53
        $this->wire = $wire;
102 53
        $this->logger = new NullLogger();
103 53
    }
104
105
    /**
106
     * {@inheritdoc}
107
     */
108 19
    public function open()
109
    {
110 19
        if ($this->status != self::STATUS_CLOSED) {
111 1
            return $this;
112
        }
113
114 19
        $this->wire->subscribe($this->id, $this);
115
116 19
        $this->send(new ChannelOpen($this->id, ''))
117 19
            ->wait(ChannelOpenOk::class);
118
119 19
        $this->status = self::STATUS_READY;
120 19
        $this->mode = self::MODE_NORMAL;
121
122 19
        return $this;
123
    }
124
125
    /**
126
     * {@inheritdoc}
127
     */
128 1
    public function flow($active)
129
    {
130
        /** @var ChannelFlowOk $frame */
131 1
        $frame = $this->send(new ChannelFlow($this->id, $active))
132 1
            ->wait(ChannelFlowOk::class);
133
134 1
        $this->status = $frame->isActive() ? self::STATUS_READY :
135 1
            self::STATUS_INACTIVE;
136
137 1
        return $this;
138
    }
139
140
    /**
141
     * {@inheritdoc}
142
     */
143 1
    public function close()
144
    {
145 1
        $this->send(new ChannelClose($this->id, 0, '', 0, 0))
146 1
            ->wait(ChannelCloseOk::class);
147
148 1
        $this->status = self::STATUS_CLOSED;
149
150 1
        return $this;
151
    }
152
153
    /**
154
     * {@inheritdoc}
155
     */
156 2
    public function qos($prefetchSize, $prefetchCount, $globally = false)
157
    {
158 2
        $this->send(new BasicQos($this->id, $prefetchSize, $prefetchCount, $globally))
159 1
            ->wait(BasicQosOk::class);
160
161 1
        return $this;
162
    }
163
164
    /**
165
     * {@inheritdoc}
166
     */
167 4
    public function exchange($name)
168
    {
169 4
        return new Exchange($this->wire, $this->id, $name);
170
    }
171
172
    /**
173
     * {@inheritdoc}
174
     */
175 14
    public function queue($name = '')
176
    {
177 14
        return new Queue($this->wire, $this->id, $name);
178
    }
179
180
    /**
181
     * {@inheritdoc}
182
     */
183 11
    public function consume($queue, callable $callback, $flags = 0, $tag = '', array $arguments = [])
184
    {
185 11
        if (empty($tag) && $flags & Consumer::FLAG_NO_WAIT) {
186 1
            $tag = uniqid('php-consumer-');
187 1
        }
188
189 11
        $this->send(new BasicConsume(
190 11
            $this->id,
191 11
            0,
192 11
            $queue,
193 11
            $tag,
194 11
            $flags & Consumer::FLAG_NO_LOCAL,
195 11
            $flags & Consumer::FLAG_NO_ACK,
196 11
            $flags & Consumer::FLAG_EXCLUSIVE,
197 11
            $flags & Consumer::FLAG_NO_WAIT,
198
            $arguments
199 11
        ));
200
201 11
        if (!($flags & Consumer::FLAG_NO_WAIT)) {
202 7
            $tag = $this->wait(BasicConsumeOk::class)
203 7
                ->getConsumerTag();
204 7
        }
205
206 11
        $this->consumers[$tag] = $callback;
207
208 11
        return new Consumer($this, $tag);
209
    }
210
211
    /**
212
     * {@inheritdoc}
213
     */
214 7
    public function get($queue, $withAck = true)
215
    {
216
        /** @var BasicGetOk|BasicGetEmpty $frame */
217 7
        $frame = $this->send(new BasicGet($this->id, 0, $queue, !$withAck))
218 7
            ->wait([BasicGetOk::class, BasicGetEmpty::class]);
219
220 7
        if ($frame instanceof BasicGetEmpty) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\BasicGetEmpty does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
221 2
            return null;
222
        }
223
224
        /** @var Header $header */
225 5
        $header = $this->wait(Header::class);
226 5
        $content = '';
227
228 5
        while ($header->getSize() > strlen($content)) {
229 3
            $content .= $this->wait(Content::class)->getData();
230 3
        }
231
232 5
        return new Delivery(
233 5
            $this,
234 5
            '',
235 5
            $frame->getDeliveryTag(),
236 5
            $frame->isRedelivered(),
237 5
            $frame->getExchange(),
238 5
            $frame->getRoutingKey(),
239 5
            $content,
240 5
            $header->getProperties()
241 5
        );
242
    }
243
244
    /**
245
     * {@inheritdoc}
246
     */
247 2
    public function recover($requeue = true)
248
    {
249 2
        $this->send(new BasicRecover($this->id, $requeue))
250 2
            ->wait(BasicRecoverOk::class);
251
252 2
        return $this;
253
    }
254
255
    /**
256
     * {@inheritdoc}
257
     */
258 6
    public function cancel($tag, $flags = 0)
259
    {
260 6
        $this->send(new BasicCancel($this->id, $tag, $flags & Consumer::FLAG_NO_WAIT));
261
262 6
        unset($this->consumers[$tag]);
263
264 6
        if ($flags & Consumer::FLAG_NO_WAIT) {
265 1
            return $this;
266
        }
267
268 5
        $this->wait(BasicCancelOk::class);
269
270 5
        return $this;
271
    }
272
273
    /**
274
     * {@inheritdoc}
275
     */
276 15
    public function publish(Message $message, $exchange = '', $routingKey = '', $flags = 0)
277
    {
278 15
        $this->send(new BasicPublish(
279 15
            $this->id,
280 15
            0,
281 15
            $exchange,
282 15
            $routingKey,
283 15
            (bool) ($flags & Message::FLAG_MANDATORY),
284 15
            (bool) ($flags & Message::FLAG_IMMEDIATE)
285 15
        ));
286
287 15
        $body = $message->getBody();
288
289 15
        $this->send(new Header($this->id, 60, 0, strlen($body), $message->getProperties()));
290 15
        $this->send(new Content($this->id, $body));
291
292 15
        return $this;
293
    }
294
295
    /**
296
     * {@inheritdoc}
297
     */
298 6
    public function ack($deliveryTag, $multiple = false)
299
    {
300 6
        $this->send(new BasicAck($this->id, $deliveryTag, $multiple));
301
302 6
        return $this;
303
    }
304
305
    /**
306
     * {@inheritdoc}
307
     */
308 4
    public function reject($deliveryTag, $requeue = true, $multiple = false)
309
    {
310 4
        $multiple ? $this->send(new BasicNack($this->id, $deliveryTag, $multiple, $requeue)) :
311 3
            $this->send(new BasicReject($this->id, $deliveryTag, $requeue));
312
313 4
        return $this;
314
    }
315
316
    /**
317
     * {@inheritdoc}
318
     */
319 2
    public function onReturn(callable $callable)
320
    {
321 2
        $this->returnCallable = $callable;
322
323 2
        return $this;
324
    }
325
326
    /**
327
     * {@inheritdoc}
328
     */
329 5
    public function selectConfirm(callable $callable, $noWait = false)
330
    {
331 5
        $this->confirmCallable = $callable;
332
333 5
        $this->send(new ConfirmSelect($this->id, $noWait));
334
335 5
        if (!$noWait) {
336 4
            $this->wait(ConfirmSelectOk::class);
337 4
        }
338
339 5
        $this->mode = self::MODE_CONFIRM;
340
341 5
        return $this;
342
    }
343
344
    /**
345
     * {@inheritdoc}
346
     */
347 7
    public function selectTx()
348
    {
349 7
        $this->send(new TxSelect($this->id))
350 7
            ->wait(TxSelectOk::class);
351
352 7
        $this->mode = self::MODE_TX;
353
354 7
        return $this;
355
    }
356
357
    /**
358
     * {@inheritdoc}
359
     */
360 5
    public function txCommit()
361
    {
362 5
        if ($this->mode != self::MODE_TX) {
363
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
364
        }
365
366 5
        $this->send(new TxCommit($this->id))
367 5
            ->wait(TxCommitOk::class);
368
369 5
        return;
370
    }
371
372
    /**
373
     * {@inheritdoc}
374
     */
375 3
    public function txRollback()
376
    {
377 3
        if ($this->mode != self::MODE_TX) {
378
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
379
        }
380
381 3
        $this->send(new TxRollback($this->id))
382 3
            ->wait(TxRollbackOk::class);
383
384 3
        return;
385
    }
386
387
    /**
388
     * {@inheritdoc}
389
     */
390 10
    public function hasConsumer($tag)
391
    {
392 10
        return isset($this->consumers[(string) $tag]);
393
    }
394
395
    /**
396
     * {@inheritdoc}
397
     */
398 3
    public function getConsumerTags()
399
    {
400 3
        return array_keys($this->consumers);
401
    }
402
403
    /**
404
     * @return string
405
     */
406 3
    public function getStatus()
407
    {
408 3
        return $this->status;
409
    }
410
411
    /**
412
     * @return int
413
     */
414
    public function getMode()
415
    {
416
        return $this->mode;
417
    }
418
419
    /**
420
     * Sends frame to the server.
421
     *
422
     * @param Frame $frame
423
     *
424
     * @return $this
425
     */
426 46
    private function send(Frame $frame)
427
    {
428 46
        $this->wire->send($frame);
429
430 46
        return $this;
431
    }
432
433
    /**
434
     * @param string|array $type
435
     *
436
     * @return Frame
437
     */
438 37
    private function wait($type)
439
    {
440 37
        return $this->wire->wait($this->id, $type);
441
    }
442
443
    /**
444
     * @param Frame $frame
445
     */
446 29
    public function dispatch(Frame $frame)
447
    {
448 29
        if ($frame instanceof ChannelClose) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\ChannelClose does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
449 3
            $this->onChannelClose($frame);
450
        }
451
452 28
        if ($frame instanceof ChannelFlow) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\ChannelFlow does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
453 1
            $this->onChannelFlow($frame);
454 1
        }
455
456 28
        if ($frame instanceof BasicDeliver) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\BasicDeliver does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
457 6
            $this->onBasicDeliver($frame);
458 5
        }
459
460 27
        if ($frame instanceof BasicReturn) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\BasicReturn does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
461 4
            $this->onBasicReturn($frame);
462 2
        }
463
464 26
        if ($frame instanceof BasicAck) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\BasicAck does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
465 3
            $this->onBasicAck($frame);
466 2
        }
467
468 25
        if ($frame instanceof BasicNack) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\BasicNack does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
469 2
            $this->onBasicNack($frame);
470 1
        }
471
472 24
        if ($frame instanceof BasicCancel) {
0 ignored issues
show
Bug introduced by
The class ButterAMQP\Framing\Method\BasicCancel does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
473 3
            $this->onBasicCancel($frame);
474 3
        }
475 24
    }
476
477
    /**
478
     * @param BasicDeliver $frame
479
     *
480
     * @throws \Exception
481
     */
482 6
    private function onBasicDeliver(BasicDeliver $frame)
483
    {
484
        /** @var Header $header */
485 6
        $header = $this->wait(Header::class);
486 6
        $content = '';
487
488 6
        while ($header->getSize() > strlen($content)) {
489 6
            $content .= $this->wait(Content::class)->getData();
490 6
        }
491
492 6
        if (!isset($this->consumers[$frame->getConsumerTag()])) {
493 1
            throw new UnknownConsumerTagException(sprintf(
494 1
                'Consumer with tag "%s" does not exist',
495 1
                $frame->getConsumerTag()
496 1
            ));
497
        }
498
499 5
        $delivery = new Delivery(
500 5
            $this,
501 5
            $frame->getConsumerTag(),
502 5
            $frame->getDeliveryTag(),
503 5
            $frame->isRedelivered(),
504 5
            $frame->getExchange(),
505 5
            $frame->getRoutingKey(),
506 5
            $content,
507 5
            $header->getProperties()
508 5
        );
509
510 5
        call_user_func($this->consumers[$frame->getConsumerTag()], $delivery);
511 5
    }
512
513
    /**
514
     * @param BasicReturn $frame
515
     *
516
     * @throws \Exception
517
     */
518 4
    private function onBasicReturn(BasicReturn $frame)
519
    {
520
        /** @var Header $header */
521 4
        $header = $this->wait(Header::class);
522 4
        $content = '';
523
524 4
        while ($header->getSize() > strlen($content)) {
525 3
            $content .= $this->wait(Content::class)->getData();
526 3
        }
527
528 4
        if (!$this->returnCallable) {
529 2
            throw new NoReturnException(
530
                'A message was returned but there is no return handler. '.
531 2
                'Make sure you setup a handler for returned messages using Channel::onReturn method, '.
532
                ', or remove MANDATORY and IMMEDIATE flags when publishing messages.'
533 2
            );
534
        }
535
536 2
        $returned = new Returned(
537 2
            $frame->getReplyCode(),
538 2
            $frame->getReplyText(),
539 2
            $frame->getExchange(),
540 2
            $frame->getRoutingKey(),
541 2
            $content,
542 2
            $header->getProperties()
543 2
        );
544
545 2
        call_user_func($this->returnCallable, $returned);
546 2
    }
547
548
    /**
549
     * @param BasicAck $frame
550
     */
551 3 View Code Duplication
    private function onBasicAck(BasicAck $frame)
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
552
    {
553 3
        if (!$this->confirmCallable) {
554 1
            throw new \RuntimeException(
555
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
556 1
            );
557
        }
558
559 2
        call_user_func($this->confirmCallable, new Confirm(true, $frame->getDeliveryTag(), $frame->isMultiple()));
560 2
    }
561
562
    /**
563
     * @param BasicNack $frame
564
     */
565 2 View Code Duplication
    private function onBasicNack(BasicNack $frame)
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
566
    {
567 2
        if (!$this->confirmCallable) {
568 1
            throw new \RuntimeException(
569
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
570 1
            );
571
        }
572
573 1
        call_user_func($this->confirmCallable, new Confirm(false, $frame->getDeliveryTag(), $frame->isMultiple()));
574 1
    }
575
576
    /**
577
     * @param BasicCancel $frame
578
     */
579 3
    private function onBasicCancel(BasicCancel $frame)
580
    {
581 3
        unset($this->consumers[$frame->getConsumerTag()]);
582
583 3
        if (!$frame->isNoWait()) {
584 1
            $this->send(new BasicCancelOk($this->id, $frame->getConsumerTag()));
585 1
        }
586 3
    }
587
588
    /**
589
     * @param ChannelFlow $frame
590
     */
591 1
    private function onChannelFlow(ChannelFlow $frame)
592
    {
593 1
        $this->send(new ChannelFlowOk($this->id, $frame->isActive()));
594
595 1
        $this->status = $frame->isActive() ? self::STATUS_READY : self::STATUS_INACTIVE;
596 1
    }
597
598
    /**
599
     * @param ChannelClose $frame
600
     *
601
     * @throws AMQPException
602
     */
603 3
    private function onChannelClose(ChannelClose $frame)
604
    {
605 3
        $this->send(new ChannelCloseOk($this->id));
606
607 3
        $this->status = self::STATUS_CLOSED;
608
609 3
        throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
610
    }
611
}
612