Consumer::collectMessages()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 19
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 13
nc 1
nop 1
dl 0
loc 19
ccs 12
cts 12
cp 1
crap 1
rs 9.8333
c 0
b 0
f 0
1
<?php
2
3
4
namespace Xervice\RabbitMQ\Business\Model\Worker\Consumer;
5
6
7
use DataProvider\RabbitMqConsumerConfigDataProvider;
8
use DataProvider\RabbitMqMessageCollectionDataProvider;
9
use DataProvider\RabbitMqMessageDataProvider;
10
use DataProvider\RabbitMqQueueDataProvider;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Xervice\RabbitMQ\Business\Dependency\Worker\Listener\ListenerInterface;
14
15
class Consumer implements ConsumerInterface
16
{
17
    /**
18
     * @var \PhpAmqpLib\Channel\AMQPChannel
19
     */
20
    private $channel;
21
22
    /**
23
     * @var \DataProvider\RabbitMqConsumerConfigDataProvider
24
     */
25
    private $config;
26
27
    /**
28
     * @var \DataProvider\RabbitMqMessageCollectionDataProvider
29
     */
30
    private $messageCollection;
31
32
    /**
33
     * @var \Xervice\RabbitMQ\Business\Dependency\Worker\Listener\ListenerInterface
34
     */
35
    private $listener;
36
37
    /**
38
     * Consumer constructor.
39
     *
40
     * @param \PhpAmqpLib\Channel\AMQPChannel $channel
41
     * @param \DataProvider\RabbitMqConsumerConfigDataProvider $config
42
     */
43 2
    public function __construct(
44
        AMQPChannel $channel,
45
        RabbitMqConsumerConfigDataProvider $config,
46
        ListenerInterface $listener
47
    ) {
48 2
        $this->channel = $channel;
49 2
        $this->config = $config;
50 2
        $this->listener = $listener;
51
52 2
        $this->messageCollection = new RabbitMqMessageCollectionDataProvider();
53 2
    }
54
55
    /**
56
     * @return void
57
     */
58 2
    public function consumeQueries(): void
59
    {
60 2
        $this->channel->basic_qos(
61 2
            0,
62 2
            $this->listener->getChunkSize(),
63 2
            false
64
        );
65
66 2
        $this->channel->basic_consume(
67 2
            $this->listener->getQueueName(),
68 2
            $this->config->getTag(),
69 2
            $this->config->getNoLocal(),
70 2
            $this->config->getNoAck(),
71 2
            $this->config->getExclusive(),
72 2
            $this->config->getNoWait(),
73
            [
74 2
                $this,
75 2
                'collectMessages'
76
            ],
77 2
            $this->config->getTicket(),
78 2
            $this->config->getArguments()
79
        );
80
81
        try {
82 2
            while (\count($this->channel->callbacks)) {
83 2
                $this->channel->wait(
84 2
                    null,
85 2
                    false,
86 2
                    1
87
                );
88
            }
89 2
        } catch (\Exception $e) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
90
        }
91
92 2
        $this->listener->handleMessage($this->messageCollection, $this->channel);
93 2
    }
94
95
    /**
96
     * @param \PhpAmqpLib\Message\AMQPMessage $message
97
     */
98 2
    public function collectMessages(AMQPMessage $message): void
99
    {
100 2
        $rabbitMessage = new RabbitMqMessageDataProvider();
101
        $rabbitMessage
102 2
            ->fromArray(
103 2
                json_decode(
104 2
                    $message->getBody(),
105 2
                    true
106
                )
107
            );
108
        $rabbitMessage
109 2
            ->setProperties($message->get_properties())
110 2
            ->setDeliveryInfo($message->delivery_info)
111 2
            ->setDeliveryTag($message->delivery_info['delivery_tag'] ?? 0);
112
113 2
        $queue = new RabbitMqQueueDataProvider();
114 2
        $queue->setName($message->delivery_info['exchange']);
115
116 2
        $this->messageCollection->addMessage($rabbitMessage);
117
    }
118
}