Completed
Pull Request — 0.4 (#35)
by jean
03:28
created

AmqpMessageProvider::buildMessage()   A

Complexity

Conditions 5
Paths 6

Size

Total Lines 23
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 13
nc 6
nop 1
dl 0
loc 23
ccs 0
cts 20
cp 0
crap 30
rs 9.5222
c 0
b 0
f 0
1
<?php
2
3
namespace Darkilliant\MqProcessBundle\Message\Provider;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use Darkilliant\MqProcessBundle\Message\Message;
7
8
class AmqpMessageProvider
9
{
10
    private $channel;
11
    private $queue;
12
    private $messages = [];
13
14
    public function __construct(AMQPChannel $channel, string $queue, string $exchange)
15
    {
16
        $this->channel = $channel;
17
        $this->queue = $queue;
18
19
        $this->channel->queue_bind($queue, $exchange);
20
        $this->channel->basic_qos(null, 5000, false);
21
    }
22
23
    public function consume($ackRequired = true)
24
    {
25
        $this->channel->basic_consume(
26
            $this->queue,
27
            'mqprocess_'.$this->queue,
28
            // when true, the server will not send messages to the connection that published them
29
            false,
30
            // when true, disable acknowledgements (more speed)
31
            !$ackRequired,
32
            // when true, queues may only be accessed by the current connection
33
            false,
34
            // when true, the server will not respond to the method.
35
            false,
36
            [$this, 'process'] //callback
37
        );
38
    }
39
40
    public function stopConsume()
41
    {
42
        $this->channel->basic_cancel($this->queue);
43
    }
44
45
    public function process($envelope)
46
    {
47
        if (null === $envelope) {
48
            return;
49
        }
50
51
        $this->messages[] = $message = $this->buildMessage($envelope);
0 ignored issues
show
Unused Code introduced by
The assignment to $message is dead and can be removed.
Loading history...
52
    }
53
54
    public function messageOk(Message $message)
55
    {
56
        $this->channel->basic_ack($message->getId());
57
    }
58
59
    public function messageKo(Message $message)
60
    {
61
        $this->channel->basic_nack($message->getId());
62
    }
63
64
    public function waitChannel($blocking = true)
65
    {
66
        $this->channel->wait(null, !$blocking);
67
    }
68
69
    public function fetchMessage()
70
    {
71
        return array_shift($this->messages);
72
    }
73
74
    public function isFinish()
75
    {
76
        return !count($this->channel->callbacks);
77
    }
78
79
    private function buildMessage($envelope): Message
80
    {
81
        $properties = [];
82
        $propertyKeys = [
83
            'content_type', 'delivery_mode', 'content_encoding', 'type', 'timestamp', 'priority', 'expiration',
84
            'app_id', 'message_id', 'reply_to', 'correlation_id', 'user_id', 'cluster_id', 'channel', 'consumer_tag',
85
            'delivery_tag', 'redelivered', 'exchange', 'routing_key',
86
        ];
87
88
        foreach ($propertyKeys as $key) {
89
            if ($envelope->has($key)) {
90
                $properties[$key] = $envelope->get($key);
91
            }
92
        }
93
94
        $properties['headers'] = [];
95
        if ($envelope->has('application_headers')) {
96
            foreach ($envelope->get('application_headers') as $key => $value) {
97
                $properties['headers'][$key] = $value[1];
98
            }
99
        }
100
101
        return new Message($envelope->body, $properties, $envelope->get('delivery_tag'));
102
    }
103
}
104