Completed
Push — master ( 23e91d...042a18 )
by Cody
02:22
created

Consumer::consume()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 7
cts 7
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 1
crap 2
1
<?php
2
3
namespace Ccovey\RabbitMQ\Consumer;
4
5
use Ccovey\RabbitMQ\ChannelInterface;
6
use Ccovey\RabbitMQ\Connection\ConnectionInterface;
7
use Ccovey\RabbitMQ\Queue;
8
use Ccovey\RabbitMQ\QueueDeclarer;
9
use Ccovey\RabbitMQ\QueuedMessage;
10
use Ccovey\RabbitMQ\QueuedMessageInterface;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
class Consumer implements ConsumerInterface
14
{
15
    /**
16
     * @var ConnectionInterface
17
     */
18
    private $connection;
19
20
    /**
21
     * @var QueueDeclarer
22
     */
23
    private $queueDeclarer;
24
25
    /**
26
     * @var ChannelInterface
27
     */
28
    private $channel;
29
30
    /**
31
     * @var callable
32
     */
33
    private $callback;
34
35
    /**
36
     * @var callable
37
     */
38
    private $restartCheckCallable;
39
40 4
    public function __construct(ConnectionInterface $connection, QueueDeclarer $queueDeclarer, string $channelId = '')
41
    {
42 4
        $this->connection = $connection;
43 4
        $this->queueDeclarer = $queueDeclarer;
44 4
        $this->channel = $this->connection->getChannel($channelId);
45 4
    }
46
47 1
    public function setCallback(callable $callback = null)
48
    {
49 1
        $this->callback = $callback;
50 1
    }
51
52 1
    public function setRestartCheckCallable(callable $callable)
53
    {
54 1
        $this->restartCheckCallable = $callable;
55 1
    }
56
57 2
    public function consume(Consumable $consumable)
58
    {
59 2
        $this->queueDeclarer->declareQueue($consumable->getQueueName());
60 2
        $consumable->setCallback([$this, 'process']);
61 2
        $this->channel->consume($consumable);
62
63 2
        while (count($this->channel->getCallbacks())) {
64 2
            $this->channel->wait();
65
        }
66 1
    }
67
68 1
    public function getMessage(Consumable $consumable) : QueuedMessage
69
    {
70 1
        $this->queueDeclarer->declareQueue($consumable->getQueueName());
71 1
        return new QueuedMessage($this->channel->getMessage($consumable));
72
    }
73
74 1
    public function getChannel() : ChannelInterface
75
    {
76 1
        return $this->channel;
77
    }
78
79 1
    public function process(AMQPMessage $message)
80
    {
81 1
        $queuedMessage = new QueuedMessage($message);
82
83 1
        call_user_func($this->callback, $queuedMessage, $this->channel);
84
85 1
        $this->checkRestart($queuedMessage);
86
    }
87
88 1
    public function getSize($queue) : int
89
    {
90 1
        $queueParams = new Queue(
91
            $queue,
92 1
            '',
93 1
            null,
94 1
            true
95
        );
96
97 1
        return $this->channel->getQueueSize($queueParams);
98
    }
99
100 1
    private function checkRestart(QueuedMessageInterface $queuedMessage)
101
    {
102 1
        if ($this->restartCheckCallable) {
103 1
            ($this->restartCheckCallable)($queuedMessage);
104
        }
105
    }
106
}
107