Test Failed
Push — master ( 82b045...1f684d )
by Mike
04:10
created

Consumer   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 99
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 43
dl 0
loc 99
ccs 0
cts 60
cp 0
rs 10
c 0
b 0
f 0
wmc 6

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 1
A consumeQueries() 0 37 4
A collectMessages() 0 19 1
1
<?php
2
3
4
namespace Xervice\RabbitMQ\Business\Model\Worker\Consumer;
5
6
7
use DataProvider\RabbitMqConsumerConfigDataProvider;
8
use DataProvider\RabbitMqMessageCollectionDataProvider;
9
use DataProvider\RabbitMqMessageDataProvider;
10
use DataProvider\RabbitMqQueueDataProvider;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Xervice\RabbitMQ\Business\Dependency\Worker\Listener\ListenerInterface;
14
15
class Consumer implements ConsumerInterface
16
{
17
    /**
18
     * @var \PhpAmqpLib\Channel\AMQPChannel
19
     */
20
    private $channel;
21
22
    /**
23
     * @var \DataProvider\RabbitMqConsumerConfigDataProvider
24
     */
25
    private $config;
26
27
    /**
28
     * @var \DataProvider\RabbitMqMessageCollectionDataProvider
29
     */
30
    private $messageCollection;
31
32
    /**
33
     * Consumer constructor.
34
     *
35
     * @param \PhpAmqpLib\Channel\AMQPChannel $channel
36
     * @param \DataProvider\RabbitMqConsumerConfigDataProvider $config
37
     */
38
    public function __construct(
39
        AMQPChannel $channel,
40
        RabbitMqConsumerConfigDataProvider $config
41
    ) {
42
        $this->channel = $channel;
43
        $this->config = $config;
44
45
        $this->messageCollection = new RabbitMqMessageCollectionDataProvider();
46
    }
47
48
    /**
49
     * @param \Xervice\RabbitMQ\Business\Dependency\Worker\Listener\ListenerInterface $listener
50
     *
51
     * @return void
52
     */
53
    public function consumeQueries(ListenerInterface $listener): void
54
    {
55
        $this->channel->basic_qos(
56
            0,
57
            $listener->getChunkSize(),
58
            false
59
        );
60
61
        $this->channel->basic_consume(
62
            $listener->getQueueName(),
63
            $this->config->getTag(),
64
            $this->config->getNoLocal(),
65
            $this->config->getNoAck(),
66
            $this->config->getExclusive(),
67
            $this->config->getNoWait(),
68
            [
69
                $this,
70
                'collectMessages'
71
            ],
72
            $this->config->getTicket(),
73
            $this->config->getArguments()
74
        );
75
76
        try {
77
            $finished = false;
78
            while (\count($this->channel->callbacks) && !$finished) {
79
                $this->channel->wait(
80
                    null,
81
                    false,
82
                    1
83
                );
84
            }
85
        } catch (\Exception $e) {
86
            $finished = true;
0 ignored issues
show
Unused Code introduced by
The assignment to $finished is dead and can be removed.
Loading history...
87
        }
88
89
        $listener->handleMessage($this->messageCollection, $this->channel);
90
    }
91
92
    /**
93
     * @param \PhpAmqpLib\Message\AMQPMessage $message
94
     */
95
    public function collectMessages(AMQPMessage $message): void
96
    {
97
        $rabbitMessage = new RabbitMqMessageDataProvider();
98
        $rabbitMessage
99
            ->fromArray(
100
                json_decode(
101
                    $message->getBody(),
102
                    true
103
                )
104
            );
105
        $rabbitMessage
106
            ->setProperties($message->get_properties())
107
            ->setDeliveryInfo($message->delivery_info)
108
            ->setDeliveryTag($message->delivery_info['delivery_tag'] ?? 0);
109
110
        $queue = new RabbitMqQueueDataProvider();
111
        $queue->setName($message->delivery_info['exchange']);
112
113
        $this->messageCollection->addMessage($rabbitMessage);
114
    }
115
}