Completed
Pull Request — 0.4 (#35)
by jean
11:34
created

AmqpMessageProvider::process()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Darkilliant\MqProcessBundle\Message\Provider;
6
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use Darkilliant\MqProcessBundle\Message\Message;
9
10
class AmqpMessageProvider
11
{
12
    /** @var AMQPChannel */
13
    private $channel;
14
    /** @var string */
15
    private $queue;
16
    /** @var array */
17
    private $messages = [];
18
    /** @var string */
19
    private $consumerTag;
20
21
    public function __construct(AMQPChannel $channel, string $queue, string $exchange, bool $persistant, int $batchCount)
0 ignored issues
show
Unused Code introduced by
The parameter $batchCount is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

21
    public function __construct(AMQPChannel $channel, string $queue, string $exchange, bool $persistant, /** @scrutinizer ignore-unused */ int $batchCount)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
22
    {
23
        $this->channel = $channel;
24
        $this->queue = $queue;
25
26
        $this->channel->queue_declare($this->queue, false, $persistant, false, false);
27
        $this->channel->queue_bind($queue, $exchange, $this->queue);
28
        $this->channel->basic_qos(null, 5000, false);
29
    }
30
31
    public function consume($ackRequired = true)
32
    {
33
        $this->channel->basic_consume(
34
            $this->queue,
35
            $this->getConsumerTag(),
36
            // when true, the server will not send messages to the connection that published them
37
            false,
38
            // when true, disable acknowledgements (more speed)
39
            !$ackRequired,
40
            // when true, queues may only be accessed by the current connection
41
            false,
42
            // when true, the server will not respond to the method.
43
            false,
44
            [$this, 'process'] //callback
45
        );
46
    }
47
48
    public function stopConsume()
49
    {
50
        $this->channel->basic_cancel($this->queue);
51
    }
52
53
    public function process($envelope)
54
    {
55
        if (null === $envelope) {
56
            return;
57
        }
58
59
        $this->messages[] = $message = $this->buildMessage($envelope);
0 ignored issues
show
Unused Code introduced by
The assignment to $message is dead and can be removed.
Loading history...
60
    }
61
62
    public function messageOk(Message $message)
63
    {
64
        $this->channel->basic_ack($message->getId());
65
    }
66
67
    public function messageKo(Message $message, bool $requeue)
68
    {
69
        $this->channel->basic_nack($message->getId(), false, $requeue);
70
    }
71
72
    public function waitChannel($blocking = true)
73
    {
74
        $this->channel->wait(null, !$blocking);
75
    }
76
77
    public function fetchMessage()
78
    {
79
        return array_shift($this->messages);
80
    }
81
82
    protected function getConsumerTag(): string
83
    {
84
        if (null === $this->consumerTag) {
85
            $this->consumerTag = sprintf('mqprocess_%s_%s_%s', $this->queue, time(), uniqid());
86
        }
87
88
        return $this->consumerTag;
89
    }
90
91
    private function buildMessage($envelope): Message
92
    {
93
        $properties = [];
94
        $propertyKeys = [
95
            'content_type', 'delivery_mode', 'content_encoding', 'type', 'timestamp', 'priority', 'expiration',
96
            'app_id', 'message_id', 'reply_to', 'correlation_id', 'user_id', 'cluster_id', 'channel', 'consumer_tag',
97
            'delivery_tag', 'redelivered', 'exchange', 'routing_key',
98
        ];
99
100
        foreach ($propertyKeys as $key) {
101
            if ($envelope->has($key)) {
102
                $properties[$key] = $envelope->get($key);
103
            }
104
        }
105
106
        $properties['headers'] = [];
107
        if ($envelope->has('application_headers')) {
108
            foreach ($envelope->get('application_headers') as $key => $value) {
109
                $properties['headers'][$key] = $value[1];
110
            }
111
        }
112
113
        return new Message($envelope->body, $properties, $envelope->get('delivery_tag'));
114
    }
115
}
116