Completed
Push — master ( 9f400a...658390 )
by Cody
02:13
created

Consumer::getSize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Ccovey\RabbitMQ\Consumer;
4
5
use Ccovey\RabbitMQ\ChannelInterface;
6
use Ccovey\RabbitMQ\Connection\ConnectionInterface;
7
use Ccovey\RabbitMQ\Queue;
8
use Ccovey\RabbitMQ\QueuedMessage;
9
use Ccovey\RabbitMQ\QueuedMessageInterface;
10
use PhpAmqpLib\Message\AMQPMessage;
11
12
class Consumer implements ConsumerInterface
13
{
14
    /**
15
     * @var ConnectionInterface
16
     */
17
    private $connection;
18
19
    /**
20
     * @var ChannelInterface
21
     */
22
    private $channel;
23
24
    /**
25
     * @var callable
26
     */
27
    private $callback;
28
29
    /**
30
     * @var callable
31
     */
32
    private $restartCheckCallable;
33
34 4
    public function __construct(ConnectionInterface $connection, string $channelId = '')
35
    {
36 4
        $this->connection = $connection;
37 4
        $this->channel = $this->connection->getChannel($channelId);
38 4
    }
39
40 1
    public function setCallback(callable $callback = null)
41
    {
42 1
        $this->callback = $callback;
43 1
    }
44
45 1
    public function setRestartCheckCallable(callable $callable)
46
    {
47 1
        $this->restartCheckCallable = $callable;
48 1
    }
49
50 2
    public function consume(Consumable $consumable)
51
    {
52 2
        $consumable->setCallback([$this, 'process']);
53 2
        $this->channel->consume($consumable);
54
55 2
        while (count($this->channel->getCallbacks())) {
56 2
            $this->channel->wait();
57
        }
58 1
    }
59
60 1
    public function getMessage(Consumable $consumable) : QueuedMessage
61
    {
62 1
        return new QueuedMessage($this->channel->getMessage($consumable));
63
    }
64
65 1
    public function getChannel() : ChannelInterface
66
    {
67 1
        return $this->channel;
68
    }
69
70 1
    public function process(AMQPMessage $message)
71
    {
72 1
        $queuedMessage = new QueuedMessage($message);
73
74 1
        call_user_func($this->callback, $queuedMessage, $this->channel);
75
76 1
        $this->checkRestart($queuedMessage);
77
    }
78
79 1
    public function getSize($queue) : int
80
    {
81 1
        $queueParams = new Queue(
82
            $queue,
83 1
            '',
84 1
            null,
85 1
            true
86
        );
87
88 1
        return $this->channel->getQueueSize($queueParams);
89
    }
90
91 1
    private function checkRestart(QueuedMessageInterface $queuedMessage)
92
    {
93 1
        if ($this->restartCheckCallable) {
94 1
            ($this->restartCheckCallable)($queuedMessage);
95
        }
96
    }
97
}
98