Passed
Push — master ( 055b76...bacc0b )
by Patrick
01:33 queued 12s
created

Consumer::consume()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 29
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 4
eloc 17
c 1
b 0
f 1
nc 4
nop 1
dl 0
loc 29
ccs 0
cts 18
cp 0
crap 20
rs 9.7
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of forecast.it.fill project.
7
 * (c) Patrick Jaja <[email protected]>
8
 * This source file is subject to the MIT license that is bundled
9
 * with this source code in the file LICENSE.
10
 */
11
12
namespace ForecastAutomation\AmqpClient\Business;
13
14
use Enqueue\AmqpExt\AmqpContext;
15
use ForecastAutomation\Log\LogFacade;
16
use ForecastAutomation\QueueClient\Shared\Dto\MessageDto;
17
use ForecastAutomation\QueueClient\Shared\Plugin\QueuePluginCollection;
18
use ForecastAutomation\Serializer\SerializerFacade;
19
20
class Consumer
21
{
22
    public function __construct(
23
        private AmqpContext $context,
24
        private QueuePluginCollection $queuePluginCollection,
25
        private SerializerFacade $serializerFacade,
26
        private LogFacade $logFacade
27
    ) {
28
    }
29
30
    public function consume(string $queueName): void
31
    {
32
        // https://blog.forma-pro.com/getting-started-with-rabbitmq-in-php-84d331e20a66
33
        $queue = $this->context->createQueue($this->queuePluginCollection->offsetGet($queueName)->getQueueName());
34
        $this->context->declareQueue($queue);
35
36
        $consumer = $this->context->createConsumer($queue);
37
38
        $totalMessages = 0;
39
        $maxMessages = 5000;
40
        while ($totalMessages < $maxMessages) {
41
            $message = $consumer->receive($timeout = 10);
42
43
            if (null === $message) {
44
                break;
45
            }
46
            $this->logFacade->info('Got AMQP Raw Message.', ['amqp_message' => $message]);
47
            $messageDto = (new MessageDto(...$this->serializerFacade->deserialize($message->getBody())));
0 ignored issues
show
Bug introduced by
$this->serializerFacade-...ze($message->getBody()) is expanded, but the parameter $data of ForecastAutomation\Queue...ssageDto::__construct() does not expect variable arguments. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

47
            $messageDto = (new MessageDto(/** @scrutinizer ignore-type */ ...$this->serializerFacade->deserialize($message->getBody())));
Loading history...
48
            $messageDto->setAdapterMetaResponse(['properties' => $message->getProperties(), 'headers' => $message->getHeaders(), 'deliveryTag' => $message->getDeliveryTag(), 'consumerTag' => $message->getConsumerTag(), 'redelivered' => $message->isRedelivered(), 'flags' => $message->getFlags(), 'routingKey' => $message->getRoutingKey()]);
49
50
            $messageConsumed = $this->queuePluginCollection->offsetGet($queueName)->consumeMessage(
51
                $messageDto
52
            );
53
54
            if ($messageConsumed) {
55
                $consumer->acknowledge($message);
56
            }
57
58
            ++$totalMessages;
59
        }
60
    }
61
}
62