Consumer::consume()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 7
cts 7
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 1
crap 2
1
<?php
2
3
namespace Ccovey\RabbitMQ\Consumer;
4
5
use Ccovey\RabbitMQ\ChannelInterface;
6
use Ccovey\RabbitMQ\Connection\ConnectionInterface;
7
use Ccovey\RabbitMQ\Queue;
8
use Ccovey\RabbitMQ\QueueDeclarer;
9
use Ccovey\RabbitMQ\QueuedMessage;
10
use Ccovey\RabbitMQ\QueuedMessageInterface;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
class Consumer implements ConsumerInterface
14
{
15
    /**
16
     * @var ConnectionInterface
17
     */
18
    private $connection;
19
20
    /**
21
     * @var QueueDeclarer
22
     */
23
    private $queueDeclarer;
24
25
    /**
26
     * @var ChannelInterface
27
     */
28
    private $channel;
29
30
    /**
31
     * @var callable
32
     */
33
    private $callback;
34
35
    /**
36
     * @var callable
37
     */
38
    private $restartCheckCallable;
39
40 4
    public function __construct(ConnectionInterface $connection, QueueDeclarer $queueDeclarer, string $channelId = '')
41
    {
42 4
        $this->connection = $connection;
43 4
        $this->queueDeclarer = $queueDeclarer;
44 4
        $this->channel = $this->connection->getChannel($channelId);
45 4
    }
46
47 1
    public function setCallback(callable $callback = null)
48
    {
49 1
        $this->callback = $callback;
50 1
    }
51
52 1
    public function setRestartCheckCallable(callable $callable)
53
    {
54 1
        $this->restartCheckCallable = $callable;
55 1
    }
56
57 2
    public function consume(Consumable $consumable)
58
    {
59 2
        $this->queueDeclarer->declareQueue($consumable->getQueueName());
60 2
        $consumable->setCallback([$this, 'process']);
61 2
        $this->channel->consume($consumable);
62
63 2
        while (count($this->channel->getCallbacks())) {
64 2
            $this->channel->wait();
65
        }
66 1
    }
67
68
    public function complete(QueuedMessageInterface $message)
69
    {
70
        if ($message->isFailed()) {
71
            $this->channel->nack($message->getDeliveryTag());
72
        } else {
73
            $this->channel->acknowledge($message->getDeliveryTag());
74
        }
75
    }
76
77
    /**
78
     * @return QueuedMessage|null
79
     */
80 1
    public function getMessage(Consumable $consumable)
81
    {
82 1
        $this->queueDeclarer->declareQueue($consumable->getQueueName());
83 1
        $message = $this->channel->getMessage($consumable);
84
85 1
        if ($message) {
86 1
            return new QueuedMessage($message);
87
        }
88
    }
89
90 1
    public function getChannel() : ChannelInterface
91
    {
92 1
        return $this->channel;
93
    }
94
95 1
    public function process(AMQPMessage $message)
96
    {
97 1
        $queuedMessage = new QueuedMessage($message);
98
99 1
        call_user_func($this->callback, $queuedMessage, $this->channel);
100
101 1
        $this->checkRestart($queuedMessage);
102
    }
103
104 1
    public function getSize($queue) : int
105
    {
106 1
        $queueParams = new Queue(
107
            $queue,
108 1
            '',
109 1
            null,
110 1
            true
111
        );
112
113 1
        return $this->channel->getQueueSize($queueParams);
114
    }
115
116 1
    private function checkRestart(QueuedMessageInterface $queuedMessage)
117
    {
118 1
        if ($this->restartCheckCallable) {
119 1
            ($this->restartCheckCallable)($queuedMessage);
120
        }
121
    }
122
}
123