Completed
Push — master ( 65967c...597be2 )
by Cody
02:06
created

Consumer::process()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1.008

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 4
cts 5
cp 0.8
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 1
crap 1.008
1
<?php
2
3
namespace Ccovey\RabbitMQ\Consumer;
4
5
use Ccovey\RabbitMQ\ChannelInterface;
6
use Ccovey\RabbitMQ\Connection\ConnectionInterface;
7
use Ccovey\RabbitMQ\QueuedMessage;
8
use Ccovey\RabbitMQ\QueuedMessageInterface;
9
use PhpAmqpLib\Message\AMQPMessage;
10
11
class Consumer implements ConsumerInterface
12
{
13
    /**
14
     * @var ConnectionInterface
15
     */
16
    private $connection;
17
18
    /**
19
     * @var ChannelInterface
20
     */
21
    private $channel;
22
23
    /**
24
     * @var callable
25
     */
26
    private $callback;
27
28
    /**
29
     * @var callable
30
     */
31
    private $restartCheckCallable;
32
33 2
    public function __construct(ConnectionInterface $connection, string $channelId = '')
34
    {
35 2
        $this->connection = $connection;
36 2
        $this->channel = $this->connection->getChannel($channelId);
37 2
    }
38
39 1
    public function setCallback(callable $callback = null)
40
    {
41 1
        $this->callback = $callback;
42 1
    }
43
44 1
    public function setRestartCheckCallable(callable $callable)
45
    {
46 1
        $this->restartCheckCallable = $callable;
47 1
    }
48
49 2
    public function consume(Consumable $consumable)
50
    {
51 2
        $consumable->setCallback([$this, 'process']);
52 2
        $this->channel->consume($consumable);
53
54 2
        while (count($this->channel->getCallbacks())) {
55 2
            $this->channel->wait();
56
        }
57 1
    }
58
59
    public function getMessage(Consumable $consumable) : QueuedMessage
60
    {
61
        return new QueuedMessage($this->channel->getMessage($consumable));
62
    }
63
64
    public function getChannel() : ChannelInterface
65
    {
66
        return $this->channel;
67
    }
68
69 1
    public function process(AMQPMessage $message)
70
    {
71 1
        $queuedMessage = new QueuedMessage($message);
72
73 1
        call_user_func($this->callback, $queuedMessage);
74
75 1
        $this->checkRestart($queuedMessage);
76
    }
77
78 1
    private function checkRestart(QueuedMessageInterface $queuedMessage)
79
    {
80 1
        if ($this->restartCheckCallable) {
81 1
            ($this->restartCheckCallable)($queuedMessage);
82
        }
83
    }
84
}
85