Completed
Pull Request — 0.4 (#35)
by jean
10:46
created

AmqpMessageProvider   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 107
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 42
dl 0
loc 107
rs 10
c 0
b 0
f 0
wmc 16

10 Methods

Rating   Name   Duplication   Size   Complexity  
A consume() 0 14 1
A buildMessage() 0 23 5
A messageOk() 0 3 1
A process() 0 7 2
A getConsumerTag() 0 7 2
A fetchMessage() 0 3 1
A waitChannel() 0 3 1
A __construct() 0 8 1
A stopConsume() 0 3 1
A messageKo() 0 3 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Darkilliant\MqProcessBundle\Message\Provider;
6
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use Darkilliant\MqProcessBundle\Message\Message;
9
use PhpAmqpLib\Message\AMQPMessage;
10
11
class AmqpMessageProvider implements MessageProviderInterface
12
{
13
    /** @var AMQPChannel */
14
    private $channel;
15
    /** @var string */
16
    private $queue;
17
    /** @var array */
18
    private $messages = [];
19
    /** @var string */
20
    private $consumerTag;
21
22
    public function __construct(AMQPChannel $channel, string $queue, string $exchange, bool $persistant, int $batchCount)
0 ignored issues
show
Unused Code introduced by
The parameter $batchCount is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

22
    public function __construct(AMQPChannel $channel, string $queue, string $exchange, bool $persistant, /** @scrutinizer ignore-unused */ int $batchCount)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
23
    {
24
        $this->channel = $channel;
25
        $this->queue = $queue;
26
27
        $this->channel->queue_declare($this->queue, false, $persistant, false, false);
28
        $this->channel->queue_bind($queue, $exchange, $this->queue);
29
        $this->channel->basic_qos(null, 5000, false);
30
    }
31
32
    public function consume($ackRequired = true)
33
    {
34
        $this->channel->basic_consume(
35
            $this->queue,
36
            $this->getConsumerTag(),
37
            // when true, the server will not send messages to the connection that published them
38
            false,
39
            // when true, disable acknowledgements (more speed)
40
            !$ackRequired,
41
            // when true, queues may only be accessed by the current connection
42
            false,
43
            // when true, the server will not respond to the method.
44
            false,
45
            [$this, 'process'] //callback
46
        );
47
    }
48
49
    public function stopConsume()
50
    {
51
        $this->channel->basic_cancel($this->queue);
52
    }
53
54
    public function process($envelope)
55
    {
56
        if (null === $envelope) {
57
            return;
58
        }
59
60
        $this->messages[] = $message = $this->buildMessage($envelope);
0 ignored issues
show
Unused Code introduced by
The assignment to $message is dead and can be removed.
Loading history...
61
    }
62
63
    public function messageOk(Message $message)
64
    {
65
        $this->channel->basic_ack($message->getId());
66
    }
67
68
    public function messageKo(Message $message, bool $requeue)
69
    {
70
        $this->channel->basic_nack($message->getId(), false, $requeue);
71
    }
72
73
    public function waitChannel($blocking = true)
74
    {
75
        $this->channel->wait(null, !$blocking);
76
    }
77
78
    public function fetchMessage()
79
    {
80
        return array_shift($this->messages);
81
    }
82
83
    /**
84
     * @codeCoverageIgnore
85
     */
86
    protected function getConsumerTag(): string
87
    {
88
        if (null === $this->consumerTag) {
89
            $this->consumerTag = sprintf('mqprocess_%s_%s_%s', $this->queue, time(), uniqid());
90
        }
91
92
        return $this->consumerTag;
93
    }
94
95
    private function buildMessage(AMQPMessage $envelope): Message
96
    {
97
        $properties = [];
98
        $propertyKeys = [
99
            'content_type', 'delivery_mode', 'content_encoding', 'type', 'timestamp', 'priority', 'expiration',
100
            'app_id', 'message_id', 'reply_to', 'correlation_id', 'user_id', 'cluster_id', 'channel', 'consumer_tag',
101
            'delivery_tag', 'redelivered', 'exchange', 'routing_key',
102
        ];
103
104
        foreach ($propertyKeys as $key) {
105
            if ($envelope->has($key)) {
106
                $properties[$key] = $envelope->get($key);
107
            }
108
        }
109
110
        $properties['headers'] = [];
111
        if ($envelope->has('application_headers')) {
112
            foreach ($envelope->get('application_headers') as $key => $value) {
113
                $properties['headers'][$key] = $value[1];
114
            }
115
        }
116
117
        return new Message($envelope->body, $properties, $envelope->get('delivery_tag'));
118
    }
119
}
120