Channel   C
last analyzed

Complexity

Total Complexity 59

Size/Duplication

Total Lines 553
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 98.68%

Importance

Changes 0
Metric Value
wmc 59
lcom 1
cbo 11
dl 0
loc 553
ccs 224
cts 227
cp 0.9868
rs 6.1904
c 0
b 0
f 0

35 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A serve() 0 6 1
A close() 0 9 1
A qos() 0 7 1
A exchange() 0 4 1
A queue() 0 4 1
B consume() 0 27 4
A open() 0 15 2
A flow() 0 12 2
B get() 0 25 2
A recover() 0 7 1
A cancel() 0 14 2
A publish() 0 18 1
A ack() 0 6 1
A reject() 0 7 2
A onReturn() 0 6 1
A selectConfirm() 0 14 2
A selectTx() 0 9 1
A txCommit() 0 11 2
A txRollback() 0 11 2
A hasConsumer() 0 4 1
A getConsumerTags() 0 4 1
A getStatus() 0 4 1
A send() 0 6 1
B dispatch() 0 18 8
A onBasicDeliver() 0 23 2
A onBasicReturn() 0 22 2
A onBasicAck() 0 4 1
A onBasicNack() 0 4 1
A confirmPublishing() 0 10 2
A onBasicCancel() 0 8 2
A onChannelFlow() 0 6 2
A onChannelClose() 0 8 1
A readHeader() 0 4 1
A readContent() 0 11 2

How to fix   Complexity   

Complex Class

Complex classes like Channel often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Channel, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ChannelInterface;
6
use ButterAMQP\Confirm;
7
use ButterAMQP\Delivery;
8
use ButterAMQP\Exception\AMQPException;
9
use ButterAMQP\Exception\NoReturnException;
10
use ButterAMQP\Exception\TransactionNotSelectedException;
11
use ButterAMQP\Exception\UnknownConsumerTagException;
12
use ButterAMQP\AMQP091\Framing\Content;
13
use ButterAMQP\AMQP091\Framing\Frame;
14
use ButterAMQP\AMQP091\Framing\Header;
15
use ButterAMQP\AMQP091\Framing\Method\BasicAck;
16
use ButterAMQP\AMQP091\Framing\Method\BasicCancel;
17
use ButterAMQP\AMQP091\Framing\Method\BasicCancelOk;
18
use ButterAMQP\AMQP091\Framing\Method\BasicConsume;
19
use ButterAMQP\AMQP091\Framing\Method\BasicConsumeOk;
20
use ButterAMQP\AMQP091\Framing\Method\BasicDeliver;
21
use ButterAMQP\AMQP091\Framing\Method\BasicGet;
22
use ButterAMQP\AMQP091\Framing\Method\BasicGetEmpty;
23
use ButterAMQP\AMQP091\Framing\Method\BasicGetOk;
24
use ButterAMQP\AMQP091\Framing\Method\BasicNack;
25
use ButterAMQP\AMQP091\Framing\Method\BasicPublish;
26
use ButterAMQP\AMQP091\Framing\Method\BasicQos;
27
use ButterAMQP\AMQP091\Framing\Method\BasicQosOk;
28
use ButterAMQP\AMQP091\Framing\Method\BasicRecover;
29
use ButterAMQP\AMQP091\Framing\Method\BasicRecoverOk;
30
use ButterAMQP\AMQP091\Framing\Method\BasicReject;
31
use ButterAMQP\AMQP091\Framing\Method\BasicReturn;
32
use ButterAMQP\AMQP091\Framing\Method\ChannelClose;
33
use ButterAMQP\AMQP091\Framing\Method\ChannelCloseOk;
34
use ButterAMQP\AMQP091\Framing\Method\ChannelFlow;
35
use ButterAMQP\AMQP091\Framing\Method\ChannelFlowOk;
36
use ButterAMQP\AMQP091\Framing\Method\ChannelOpen;
37
use ButterAMQP\AMQP091\Framing\Method\ChannelOpenOk;
38
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelect;
39
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelectOk;
40
use ButterAMQP\AMQP091\Framing\Method\TxCommit;
41
use ButterAMQP\AMQP091\Framing\Method\TxCommitOk;
42
use ButterAMQP\AMQP091\Framing\Method\TxRollback;
43
use ButterAMQP\AMQP091\Framing\Method\TxRollbackOk;
44
use ButterAMQP\AMQP091\Framing\Method\TxSelect;
45
use ButterAMQP\AMQP091\Framing\Method\TxSelectOk;
46
use ButterAMQP\Message;
47
use ButterAMQP\Returned;
48
49
class Channel implements ChannelInterface, WireSubscriberInterface
50
{
51
    const STATUS_CLOSED = 0;
52
    const STATUS_READY = 1;
53
    const STATUS_INACTIVE = 2;
54
55
    const MODE_NORMAL = 0;
56
    const MODE_CONFIRM = 1;
57
    const MODE_TX = 2;
58
59
    /**
60
     * @var int
61
     */
62
    private $id;
63
64
    /**
65
     * @var WireInterface
66
     */
67
    private $wire;
68
69
    /**
70
     * @var int
71
     */
72
    private $status = self::STATUS_CLOSED;
73
74
    /**
75
     * @var int
76
     */
77
    private $mode = self::MODE_NORMAL;
78
79
    /**
80
     * @var callable[]
81
     */
82
    private $consumers = [];
83
84
    /**
85
     * @var callable
86
     */
87
    private $returnCallable;
88
89
    /**
90
     * @var callable
91
     */
92
    private $confirmCallable;
93
94
    /**
95
     * @param WireInterface $wire
96
     * @param int           $id
97
     */
98 56
    public function __construct(WireInterface $wire, $id)
99
    {
100 56
        $this->id = $id;
101 56
        $this->wire = $wire;
102 56
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107 20
    public function open()
108
    {
109 19
        if ($this->status != self::STATUS_CLOSED) {
110 1
            return $this;
111
        }
112
113 19
        $this->wire->subscribe($this->id, $this);
114 20
        $this->wire->send(new ChannelOpen($this->id, ''));
115 19
        $this->wire->wait($this->id, ChannelOpenOk::class);
116
117 19
        $this->status = self::STATUS_READY;
118 19
        $this->mode = self::MODE_NORMAL;
119
120 19
        return $this;
121
    }
122
123
    /**
124
     * {@inheritdoc}
125
     */
126 1
    public function flow($active)
127
    {
128 1
        $this->wire->send(new ChannelFlow($this->id, $active));
129
130
        /** @var ChannelFlowOk $frame */
131 1
        $frame = $this->wire->wait($this->id, ChannelFlowOk::class);
132
133 1
        $this->status = $frame->isActive() ? self::STATUS_READY :
134 1
            self::STATUS_INACTIVE;
135
136 1
        return $this;
137
    }
138
139
    /**
140
     * {@inheritdoc}
141
     */
142 1
    public function serve($blocking = true)
143
    {
144 1
        $this->wire->next($blocking);
145
146 1
        return $this;
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152 1
    public function close()
153
    {
154 1
        $this->wire->send(new ChannelClose($this->id, 0, '', 0, 0));
155 1
        $this->wire->wait($this->id, ChannelCloseOk::class);
156
157 1
        $this->status = self::STATUS_CLOSED;
158
159 1
        return $this;
160
    }
161
162
    /**
163
     * {@inheritdoc}
164
     */
165 1
    public function qos($prefetchSize, $prefetchCount, $globally = false)
166
    {
167 1
        $this->wire->send(new BasicQos($this->id, $prefetchSize, $prefetchCount, $globally));
168 1
        $this->wire->wait($this->id, BasicQosOk::class);
169
170 1
        return $this;
171
    }
172
173
    /**
174
     * {@inheritdoc}
175
     */
176 4
    public function exchange($name)
177
    {
178 4
        return new Exchange($this->wire, $this->id, $name);
179
    }
180
181
    /**
182
     * {@inheritdoc}
183
     */
184 14
    public function queue($name = '')
185
    {
186 14
        return new Queue($this->wire, $this->id, $name);
187
    }
188
189
    /**
190
     * {@inheritdoc}
191
     */
192 11
    public function consume($queue, callable $callback, $flags = 0, $tag = '', array $arguments = [])
193
    {
194 11
        if (empty($tag) && $flags & Consumer::FLAG_NO_WAIT) {
195 1
            $tag = uniqid('php-consumer-');
196 1
        }
197
198 11
        $this->wire->send(new BasicConsume(
199 11
            $this->id,
200 11
            0,
201 11
            $queue,
202 11
            $tag,
203 11
            $flags & Consumer::FLAG_NO_LOCAL,
204 11
            $flags & Consumer::FLAG_NO_ACK,
205 11
            $flags & Consumer::FLAG_EXCLUSIVE,
206 11
            $flags & Consumer::FLAG_NO_WAIT,
207
            $arguments
208 11
        ));
209
210 11
        if (!($flags & Consumer::FLAG_NO_WAIT)) {
211 7
            $tag = $this->wire->wait($this->id, BasicConsumeOk::class)
212 7
                ->getConsumerTag();
213 7
        }
214
215 11
        $this->consumers[$tag] = $callback;
216
217 11
        return new Consumer($this, $tag);
218
    }
219
220
    /**
221
     * {@inheritdoc}
222
     */
223 7
    public function get($queue, $withAck = true)
224
    {
225
        /* @var BasicGetOk|BasicGetEmpty $frame */
226 7
        $this->wire->send(new BasicGet($this->id, 0, $queue, !$withAck));
227
228 7
        $frame = $this->wire->wait($this->id, [BasicGetOk::class, BasicGetEmpty::class]);
229
230 7
        if ($frame instanceof BasicGetEmpty) {
231 2
            return null;
232
        }
233
234 5
        $header = $this->readHeader();
235 5
        $content = $this->readContent($header->getSize());
236
237 5
        return new Delivery(
238 5
            $this,
239 5
            '',
240 5
            $frame->getDeliveryTag(),
241 5
            $frame->isRedelivered(),
242 5
            $frame->getExchange(),
243 5
            $frame->getRoutingKey(),
244 5
            $content,
245 5
            $header->getProperties()
246 5
        );
247
    }
248
249
    /**
250
     * {@inheritdoc}
251
     */
252 2
    public function recover($requeue = true)
253
    {
254 2
        $this->wire->send(new BasicRecover($this->id, $requeue));
255 2
        $this->wire->wait($this->id, BasicRecoverOk::class);
256
257 2
        return $this;
258
    }
259
260
    /**
261
     * {@inheritdoc}
262
     */
263 6
    public function cancel($tag, $flags = 0)
264
    {
265 6
        $this->wire->send(new BasicCancel($this->id, $tag, $flags & Consumer::FLAG_NO_WAIT));
266
267 6
        unset($this->consumers[$tag]);
268
269 6
        if ($flags & Consumer::FLAG_NO_WAIT) {
270 1
            return $this;
271
        }
272
273 5
        $this->wire->wait($this->id, BasicCancelOk::class);
274
275 5
        return $this;
276
    }
277
278
    /**
279
     * {@inheritdoc}
280
     */
281 15
    public function publish(Message $message, $exchange = '', $routingKey = '', $flags = 0)
282
    {
283 15
        $this->wire->send(new BasicPublish(
284 15
            $this->id,
285 15
            0,
286 15
            $exchange,
287 15
            $routingKey,
288 15
            (bool) ($flags & Message::FLAG_MANDATORY),
289 15
            (bool) ($flags & Message::FLAG_IMMEDIATE)
290 15
        ));
291
292 15
        $body = $message->getBody();
293
294 15
        $this->wire->send(new Header($this->id, 60, 0, strlen($body), $message->getProperties()));
295 15
        $this->wire->send(new Content($this->id, $body));
296
297 15
        return $this;
298
    }
299
300
    /**
301
     * {@inheritdoc}
302
     */
303 6
    public function ack($deliveryTag, $multiple = false)
304
    {
305 6
        $this->wire->send(new BasicAck($this->id, $deliveryTag, $multiple));
306
307 6
        return $this;
308
    }
309
310
    /**
311
     * {@inheritdoc}
312
     */
313 4
    public function reject($deliveryTag, $requeue = true, $multiple = false)
314
    {
315 4
        $multiple ? $this->wire->send(new BasicNack($this->id, $deliveryTag, $multiple, $requeue)) :
316 3
            $this->wire->send(new BasicReject($this->id, $deliveryTag, $requeue));
317
318 4
        return $this;
319
    }
320
321
    /**
322
     * {@inheritdoc}
323
     */
324 2
    public function onReturn(callable $callable)
325
    {
326 2
        $this->returnCallable = $callable;
327
328 2
        return $this;
329
    }
330
331
    /**
332
     * {@inheritdoc}
333
     */
334 5
    public function selectConfirm(callable $callable, $noWait = false)
335
    {
336 5
        $this->confirmCallable = $callable;
337
338 5
        $this->wire->send(new ConfirmSelect($this->id, $noWait));
339
340 5
        if (!$noWait) {
341 4
            $this->wire->wait($this->id, ConfirmSelectOk::class);
342 4
        }
343
344 5
        $this->mode = self::MODE_CONFIRM;
345
346 5
        return $this;
347
    }
348
349
    /**
350
     * {@inheritdoc}
351
     */
352 7
    public function selectTx()
353
    {
354 7
        $this->wire->send(new TxSelect($this->id));
355 7
        $this->wire->wait($this->id, TxSelectOk::class);
356
357 7
        $this->mode = self::MODE_TX;
358
359 7
        return $this;
360
    }
361
362
    /**
363
     * {@inheritdoc}
364
     */
365 6
    public function txCommit()
366
    {
367 6
        if ($this->mode != self::MODE_TX) {
368 1
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
369
        }
370
371 5
        $this->wire->send(new TxCommit($this->id));
372 5
        $this->wire->wait($this->id, TxCommitOk::class);
373
374 5
        return;
375
    }
376
377
    /**
378
     * {@inheritdoc}
379
     */
380 4
    public function txRollback()
381
    {
382 4
        if ($this->mode != self::MODE_TX) {
383 1
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
384
        }
385
386 3
        $this->wire->send(new TxRollback($this->id));
387 3
        $this->wire->wait($this->id, TxRollbackOk::class);
388
389 3
        return;
390
    }
391
392
    /**
393
     * {@inheritdoc}
394
     */
395 10
    public function hasConsumer($tag)
396
    {
397 10
        return isset($this->consumers[(string) $tag]);
398
    }
399
400
    /**
401
     * {@inheritdoc}
402
     */
403 3
    public function getConsumerTags()
404
    {
405 3
        return array_keys($this->consumers);
406
    }
407
408
    /**
409
     * @return string
410
     */
411 3
    public function getStatus()
412
    {
413 3
        return $this->status;
414
    }
415
416
    /**
417
     * Sends frame to the server.
418
     *
419
     * @param Frame $frame
420
     *
421
     * @return $this
422
     */
423
    private function send(Frame $frame)
424
    {
425
        $this->wire->send($frame);
426
427
        return $this;
428
    }
429
430
    /**
431
     * @param Frame $frame
432
     */
433 29
    public function dispatch(Frame $frame)
434
    {
435 29
        if ($frame instanceof ChannelClose) {
436 3
            $this->onChannelClose($frame);
437 28
        } elseif ($frame instanceof ChannelFlow) {
438 1
            $this->onChannelFlow($frame);
439 28
        } elseif ($frame instanceof BasicDeliver) {
440 6
            $this->onBasicDeliver($frame);
441 26
        } elseif ($frame instanceof BasicReturn) {
442 4
            $this->onBasicReturn($frame);
443 24
        } elseif ($frame instanceof BasicAck) {
444 3
            $this->onBasicAck($frame);
445 22
        } elseif ($frame instanceof BasicNack) {
446 2
            $this->onBasicNack($frame);
447 20
        } elseif ($frame instanceof BasicCancel) {
448 3
            $this->onBasicCancel($frame);
449 3
        }
450 24
    }
451
452
    /**
453
     * @param BasicDeliver $frame
454
     *
455
     * @throws \Exception
456
     */
457 6
    private function onBasicDeliver(BasicDeliver $frame)
458
    {
459 6
        $header = $this->readHeader();
460 6
        $content = $this->readContent($header->getSize());
461
462 6
        if (!isset($this->consumers[$frame->getConsumerTag()])) {
463 1
            throw new UnknownConsumerTagException(sprintf(
464 1
                'Consumer with tag "%s" does not exist',
465 1
                $frame->getConsumerTag()
466 1
            ));
467
        }
468
469 5
        call_user_func($this->consumers[$frame->getConsumerTag()], new Delivery(
470 5
            $this,
471 5
            $frame->getConsumerTag(),
472 5
            $frame->getDeliveryTag(),
473 5
            $frame->isRedelivered(),
474 5
            $frame->getExchange(),
475 5
            $frame->getRoutingKey(),
476 5
            $content,
477 5
            $header->getProperties()
478 5
        ));
479 5
    }
480
481
    /**
482
     * @param BasicReturn $frame
483
     *
484
     * @throws NoReturnException
485
     */
486 4
    private function onBasicReturn(BasicReturn $frame)
487
    {
488 4
        $header = $this->readHeader();
489 4
        $content = $this->readContent($header->getSize());
490
491 4
        if (!$this->returnCallable) {
492 2
            throw new NoReturnException(
493
                'A message was returned but there is no return handler. Make sure you setup a handler for returned '.
494 2
                'messages using Channel::onReturn method, or remove MANDATORY and IMMEDIATE flags when publishing '.
495
                'messages.'
496 2
            );
497
        }
498
499 2
        call_user_func($this->returnCallable, new Returned(
500 2
            $frame->getReplyCode(),
501 2
            $frame->getReplyText(),
502 2
            $frame->getExchange(),
503 2
            $frame->getRoutingKey(),
504 2
            $content,
505 2
            $header->getProperties()
506 2
        ));
507 2
    }
508
509
    /**
510
     * @param BasicAck $frame
511
     */
512 3
    private function onBasicAck(BasicAck $frame)
513
    {
514 3
        $this->confirmPublishing(true, $frame->getDeliveryTag(), $frame->isMultiple());
515 2
    }
516
517
    /**
518
     * @param BasicNack $frame
519
     */
520 2
    private function onBasicNack(BasicNack $frame)
521
    {
522 2
        $this->confirmPublishing(false, $frame->getDeliveryTag(), $frame->isMultiple());
523 1
    }
524
525
    /**
526
     * @param bool   $ok
527
     * @param string $tag
528
     * @param bool   $multiple
529
     */
530 5
    private function confirmPublishing($ok, $tag, $multiple)
531
    {
532 5
        if (!$this->confirmCallable) {
533 2
            throw new \RuntimeException(
534
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
535 2
            );
536
        }
537
538 3
        call_user_func($this->confirmCallable, new Confirm($ok, $tag, $multiple));
539 3
    }
540
541
    /**
542
     * @param BasicCancel $frame
543
     */
544 3
    private function onBasicCancel(BasicCancel $frame)
545
    {
546 3
        unset($this->consumers[$frame->getConsumerTag()]);
547
548 3
        if (!$frame->isNoWait()) {
549 1
            $this->wire->send(new BasicCancelOk($this->id, $frame->getConsumerTag()));
550 1
        }
551 3
    }
552
553
    /**
554
     * @param ChannelFlow $frame
555
     */
556 1
    private function onChannelFlow(ChannelFlow $frame)
557
    {
558 1
        $this->wire->send(new ChannelFlowOk($this->id, $frame->isActive()));
559
560 1
        $this->status = $frame->isActive() ? self::STATUS_READY : self::STATUS_INACTIVE;
561 1
    }
562
563
    /**
564
     * @param ChannelClose $frame
565
     *
566
     * @throws AMQPException
567
     */
568 3
    private function onChannelClose(ChannelClose $frame)
569
    {
570 3
        $this->wire->send(new ChannelCloseOk($this->id));
571
572 3
        $this->status = self::STATUS_CLOSED;
573
574 3
        throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
575
    }
576
577
    /**
578
     * @return Header
579
     */
580 15
    private function readHeader()
581
    {
582 15
        return $this->wire->wait($this->id, Header::class);
583
    }
584
585
    /**
586
     * @param int $size
587
     *
588
     * @return string
589
     */
590 15
    private function readContent($size)
591
    {
592 15
        $content = '';
593
594 15
        while ($size > strlen($content)) {
595 12
            $content .= $this->wire->wait($this->id, Content::class)
596 12
                ->getData();
597 12
        }
598
599 15
        return $content;
600
    }
601
}
602