Issues (10)

src/Rabbit/Consumer.php (1 issue)

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Umbrellio\TableSync\Rabbit;
6
7
use Closure;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use Psr\Log\LoggerInterface;
10
use Psr\Log\NullLogger;
11
12
class Consumer
13
{
14
    private $channelContainer;
15
    private $messageBuilder;
16
    private $config;
17
    private $logger;
18
19
    private $working;
20
21
    public function __construct(
22
        ChannelContainer $channelContainer,
23
        MessageBuilder $messageBuilder,
24
        Config\Consumer $config,
25
        LoggerInterface $logger = null
26
    ) {
27
        $this->messageBuilder = $messageBuilder;
28
        $this->channelContainer = $channelContainer;
29
        $this->config = $config;
30
        $this->logger = $logger ?? new NullLogger();
31
    }
32
33
    public function consume(): void
34
    {
35
        $channel = $this->channelContainer->getChannel();
36
37
        $channel->basic_consume(
38
            $this->config->queue(),
39
            $this->config->consumerTag(),
40
            false,
41
            false,
42
            false,
43
            false,
44
            Closure::fromCallable([$this, 'handle'])
45
        );
46
47
        $this->working = true;
48
        while (count($channel->callbacks) && $this->working) {
49
            usleep($this->config->microsecondsToSleep());
50
            $channel->wait();
51
        }
52
    }
53
54
    public function terminate(): void
55
    {
56
        $this->working = false;
57
    }
58
59
    private function handle(AMQPMessage $amqpMessage): void
60
    {
61
        $message = $this->messageBuilder->buildReceivedMessage($amqpMessage);
62
        $messageId = $amqpMessage->delivery_info['delivery_tag'];
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

62
        $messageId = /** @scrutinizer ignore-deprecated */ $amqpMessage->delivery_info['delivery_tag'];
Loading history...
63
64
        $this->config->handler()
65
            ->handle($message);
66
        $this->channelContainer->getChannel()
67
            ->basic_ack($messageId);
68
        $this->logger->info("Message #{$messageId} correctly handled", [
69
            'direction' => 'receive',
70
            'body' => $amqpMessage->getBody(),
71
        ]);
72
    }
73
}
74