Completed
Push — master ( 1f684d...57b871 )
by Mike
02:29
created

Consumer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 2
dl 0
loc 8
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
4
namespace Xervice\RabbitMQ\Business\Model\Worker\Consumer;
5
6
7
use DataProvider\RabbitMqConsumerConfigDataProvider;
8
use DataProvider\RabbitMqMessageCollectionDataProvider;
9
use DataProvider\RabbitMqMessageDataProvider;
10
use DataProvider\RabbitMqQueueDataProvider;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Xervice\RabbitMQ\Business\Dependency\Worker\Listener\ListenerInterface;
14
15
class Consumer implements ConsumerInterface
16
{
17
    /**
18
     * @var \PhpAmqpLib\Channel\AMQPChannel
19
     */
20
    private $channel;
21
22
    /**
23
     * @var \DataProvider\RabbitMqConsumerConfigDataProvider
24
     */
25
    private $config;
26
27
    /**
28
     * @var \DataProvider\RabbitMqMessageCollectionDataProvider
29
     */
30
    private $messageCollection;
31
32
    /**
33
     * Consumer constructor.
34
     *
35
     * @param \PhpAmqpLib\Channel\AMQPChannel $channel
36
     * @param \DataProvider\RabbitMqConsumerConfigDataProvider $config
37
     */
38 2
    public function __construct(
39
        AMQPChannel $channel,
40
        RabbitMqConsumerConfigDataProvider $config
41
    ) {
42 2
        $this->channel = $channel;
43 2
        $this->config = $config;
44
45 2
        $this->messageCollection = new RabbitMqMessageCollectionDataProvider();
46 2
    }
47
48
    /**
49
     * @param \Xervice\RabbitMQ\Business\Dependency\Worker\Listener\ListenerInterface $listener
50
     *
51
     * @return void
52
     */
53 2
    public function consumeQueries(ListenerInterface $listener): void
54
    {
55 2
        $this->channel->basic_qos(
56 2
            0,
57 2
            $listener->getChunkSize(),
58 2
            false
59
        );
60
61 2
        $this->channel->basic_consume(
62 2
            $listener->getQueueName(),
63 2
            $this->config->getTag(),
64 2
            $this->config->getNoLocal(),
65 2
            $this->config->getNoAck(),
66 2
            $this->config->getExclusive(),
67 2
            $this->config->getNoWait(),
68
            [
69 2
                $this,
70 2
                'collectMessages'
71
            ],
72 2
            $this->config->getTicket(),
73 2
            $this->config->getArguments()
74
        );
75
76
        try {
77 2
            $finished = false;
78 2
            while (\count($this->channel->callbacks) && !$finished) {
79 2
                $this->channel->wait(
80 2
                    null,
81 2
                    false,
82 2
                    1
83
                );
84
            }
85 2
        } catch (\Exception $e) {
86 2
            $finished = true;
0 ignored issues
show
Unused Code introduced by
The assignment to $finished is dead and can be removed.
Loading history...
87
        }
88
89 2
        $listener->handleMessage($this->messageCollection, $this->channel);
90 2
    }
91
92
    /**
93
     * @param \PhpAmqpLib\Message\AMQPMessage $message
94
     */
95 2
    public function collectMessages(AMQPMessage $message): void
96
    {
97 2
        $rabbitMessage = new RabbitMqMessageDataProvider();
98
        $rabbitMessage
99 2
            ->fromArray(
100 2
                json_decode(
101 2
                    $message->getBody(),
102 2
                    true
103
                )
104
            );
105
        $rabbitMessage
106 2
            ->setProperties($message->get_properties())
107 2
            ->setDeliveryInfo($message->delivery_info)
108 2
            ->setDeliveryTag($message->delivery_info['delivery_tag'] ?? 0);
109
110 2
        $queue = new RabbitMqQueueDataProvider();
111 2
        $queue->setName($message->delivery_info['exchange']);
112
113 2
        $this->messageCollection->addMessage($rabbitMessage);
114
    }
115
}