Passed
Pull Request — 0.4 (#35)
by jean
03:52
created

AmqpMessageProvider::messageOk()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
ccs 0
cts 3
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Darkilliant\MqProcessBundle\Message\Provider;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use Darkilliant\MqProcessBundle\Message\Message;
7
8
class AmqpMessageProvider
9
{
10
    /** @var AMQPChannel */
11
    private $channel;
12
    /** @var string */
13
    private $queue;
14
    /** @var array */
15
    private $messages = [];
16
17
    public function __construct(AMQPChannel $channel, string $queue, string $exchange, bool $persistant, int $batchCount)
0 ignored issues
show
Unused Code introduced by
The parameter $batchCount is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

17
    public function __construct(AMQPChannel $channel, string $queue, string $exchange, bool $persistant, /** @scrutinizer ignore-unused */ int $batchCount)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
18
    {
19
        $this->channel = $channel;
20
        $this->queue = $queue;
21
22
        $this->channel->queue_declare($this->queue, false, $persistant, false, false);
23
        $this->channel->queue_bind($queue, $exchange, $this->queue);
24
        $this->channel->basic_qos(null, 5000, false);
25
    }
26
27
    public function consume($ackRequired = true)
28
    {
29
        $this->channel->basic_consume(
30
            $this->queue,
31
            'mqprocess_'.$this->queue,
32
            // when true, the server will not send messages to the connection that published them
33
            false,
34
            // when true, disable acknowledgements (more speed)
35
            !$ackRequired,
36
            // when true, queues may only be accessed by the current connection
37
            false,
38
            // when true, the server will not respond to the method.
39
            false,
40
            [$this, 'process'] //callback
41
        );
42
    }
43
44
    public function stopConsume()
45
    {
46
        $this->channel->basic_cancel($this->queue);
47
    }
48
49
    public function process($envelope)
50
    {
51
        if (null === $envelope) {
52
            return;
53
        }
54
55
        $this->messages[] = $message = $this->buildMessage($envelope);
0 ignored issues
show
Unused Code introduced by
The assignment to $message is dead and can be removed.
Loading history...
56
    }
57
58
    public function messageOk(Message $message)
59
    {
60
        $this->channel->basic_ack($message->getId());
61
    }
62
63
    public function messageKo(Message $message)
64
    {
65
        $this->channel->basic_nack($message->getId());
66
    }
67
68
    public function waitChannel($blocking = true)
69
    {
70
        $this->channel->wait(null, !$blocking);
71
    }
72
73
    public function fetchMessage()
74
    {
75
        return array_shift($this->messages);
76
    }
77
78
    public function isFinish()
79
    {
80
        return !count($this->channel->callbacks);
81
    }
82
83
    private function buildMessage($envelope): Message
84
    {
85
        $properties = [];
86
        $propertyKeys = [
87
            'content_type', 'delivery_mode', 'content_encoding', 'type', 'timestamp', 'priority', 'expiration',
88
            'app_id', 'message_id', 'reply_to', 'correlation_id', 'user_id', 'cluster_id', 'channel', 'consumer_tag',
89
            'delivery_tag', 'redelivered', 'exchange', 'routing_key',
90
        ];
91
92
        foreach ($propertyKeys as $key) {
93
            if ($envelope->has($key)) {
94
                $properties[$key] = $envelope->get($key);
95
            }
96
        }
97
98
        $properties['headers'] = [];
99
        if ($envelope->has('application_headers')) {
100
            foreach ($envelope->get('application_headers') as $key => $value) {
101
                $properties['headers'][$key] = $value[1];
102
            }
103
        }
104
105
        return new Message($envelope->body, $properties, $envelope->get('delivery_tag'));
106
    }
107
}
108