Completed
Push — master ( cfbf3b...0ff9ac )
by Cody
02:04
created

Consumer::setRestartCheckCallable()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Ccovey\RabbitMQ\Consumer;
4
5
use Ccovey\RabbitMQ\ChannelInterface;
6
use Ccovey\RabbitMQ\Connection\ConnectionInterface;
7
use Ccovey\RabbitMQ\QueuedMessage;
8
use Ccovey\RabbitMQ\QueuedMessageInterface;
9
use PhpAmqpLib\Message\AMQPMessage;
10
11
class Consumer implements ConsumerInterface
12
{
13
    /**
14
     * @var ConnectionInterface
15
     */
16
    private $connection;
17
18
    /**
19
     * @var ChannelInterface
20
     */
21
    private $channel;
22
23
    /**
24
     * @var callable
25
     */
26
    private $callback;
27
28
    /**
29
     * @var callable
30
     */
31
    private $restartCheckCallable;
32
33 1
    public function __construct(ConnectionInterface $connection, string $channelId = '')
34
    {
35 1
        $this->connection = $connection;
36 1
        $this->channel = $this->connection->getChannel($channelId);
37 1
    }
38 1
39
    public function setCallback(callable $callback = null)
40
    {
41
        $this->callback = $callback;
42
    }
43
44
    public function setRestartCheckCallable(callable $callable)
45 1
    {
46
        $this->restartCheckCallable = $callable;
47 1
    }
48 1
49
    public function consume(Consumable $consumable)
50 1
    {
51 1
        $consumable->setCallback([$this, 'process']);
52
        $this->channel->consume($consumable);
53 1
54
        while (count($this->channel->getCallbacks())) {
55
            $this->channel->wait();
56
        }
57
    }
58
59
    public function process(AMQPMessage $message)
60
    {
61
        $queuedMessage = new QueuedMessage($message);
62
63
        call_user_func($this->callback, $queuedMessage);
64
65
        $this->checkRestart($queuedMessage);
66
    }
67
68
    private function checkRestart(QueuedMessageInterface $queuedMessage)
69
    {
70
        if ($this->restartCheckCallable) {
71
            ($this->restartCheckCallable)($queuedMessage);
72
        }
73
    }
74
}
75