Passed
Branch master (74fab5)
by Kunal
03:19
created

RabbitMQConsumer   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 241
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 136
c 1
b 0
f 1
dl 0
loc 241
rs 10
wmc 17

2 Methods

Rating   Name   Duplication   Size   Complexity  
F consume() 0 222 16
A __construct() 0 3 1
1
<?php
2
3
namespace Kunnu\RabbitMQ;
4
5
use PhpAmqpLib\Wire\AMQPTable;
6
use Illuminate\Support\Collection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
9
class RabbitMQConsumer
10
{
11
    /**
12
     * RabbitMQ Manager.
13
     *
14
     * @var RabbitMQManager $manager
15
     */
16
    protected RabbitMQManager $manager;
17
18
    /**
19
     * Create a new RabbitMQ Publisher instance.
20
     *
21
     * @param RabbitMQManager $manager
22
     */
23
    public function __construct(RabbitMQManager $manager)
24
    {
25
        $this->manager = $manager;
26
    }
27
28
    public function consume(
29
        RabbitMQMessageConsumer $messageConsumer,
30
        string $bindingKey = '',
31
        string $connectionName = null,
32
        ConsumeConfig $consumeConfig = null
33
    ): void {
34
        $consumeConfig = $consumeConfig ?? new ConsumeConfig();
35
        $defaultConfig = new Collection($this->manager->getConfig()->get(RabbitMQManager::CONFIG_KEY . ".defaults"));
36
37
        $connectionName = $connectionName ?? $this->manager->resolveDefaultConfigName();
38
        $connection = $this->manager->resolveConnection();
39
40
        $channelId = $this->manager->resolveChannelId($consumeConfig->get("channel_id"), $connectionName);
41
        $channel = $this->manager->resolveChannel($connectionName, $channelId, $connection);
42
43
        $connectionConfig = $this->manager->resolveConfig($connectionName);
0 ignored issues
show
Bug introduced by
It seems like $connectionName can also be of type null; however, parameter $connectionName of Kunnu\RabbitMQ\RabbitMQManager::resolveConfig() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

43
        $connectionConfig = $this->manager->resolveConfig(/** @scrutinizer ignore-type */ $connectionName);
Loading history...
44
45
        // Merge/Override default connection configuration with
46
        // the configuration specified for this consuming.
47
        if ($consumeConfig && $consumeConfig->getConnectionConfig()) {
48
            // consume config > Connection config
49
            $connectionConfig = $connectionConfig->merge($consumeConfig->getConnectionConfig());
50
        }
51
52
        // Merge the exchange properties
53
        // Consume config > Connection config > Default config
54
        $exchangeProperties = array_merge(
55
            $defaultConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Default properties
56
            $connectionConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Connection properties
57
            $consumeConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Consume properties
58
        );
59
60
        // Merge the exchange config
61
        // Exchange config > Consume config > Connection config > Default config
62
        $exchangeConfig = array_merge(
63
            $defaultConfig->get('exchange', []), // Default config
64
            $connectionConfig->get('exchange', []), // Connection config
65
            $consumeConfig->get('exchange', ['properties' => $exchangeProperties]), // Consume config,
66
            $messageConsumer->getExchange() ? $messageConsumer->getExchange()->getConfig()->toArray() : [], // Exchange config
67
        );
68
69
        // Merge message exchange configuration
70
        if ($messageConsumer->getExchange()) {
71
            $messageConsumer->getExchange()->setConfig($exchangeConfig);
72
            $messageConsumer->getExchange()->getConfig()->put('name', $messageConsumer->getExchange()->getName());
73
        } else {
74
            $messageConsumer->setExchange(new RabbitMQExchange($exchangeConfig['name'] ?? '', $exchangeConfig));
75
        }
76
77
        // Merge the queue declare properties
78
        // Consume config > Connection config > Default config
79
        $queueDeclareProperties = array_merge(
80
            $defaultConfig->get('queue', ['declare_properties' => []])['declare_properties'] ?? [], // Default properties
81
            $connectionConfig->get(
82
                'queue',
83
                ['declare_properties' => []]
84
            )['declare_properties'] ?? [], // Connection properties
85
            $consumeConfig->get(
86
                'queue',
87
                ['declare_properties' => []]
88
            )['declare_properties'] ?? [], // Consume properties
89
        );
90
91
        // Merge the queue bind properties
92
        // Consume config > Connection config > Default config
93
        $queueBindProperties = array_merge(
94
            $defaultConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Default properties
95
            $connectionConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Connection properties
96
            $consumeConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Consume properties
97
        );
98
99
        // Merge the queue declare and bind properties
100
        $queueProperties = array_merge($queueDeclareProperties, $queueBindProperties);
101
102
        // Merge the queue config
103
        // Exchange config > Consume config > Connection config > Default config
104
        $queueConfig = array_merge(
105
            $defaultConfig->get('queue', []), // Default config
106
            $connectionConfig->get('queue', []), // Connection config
107
            $consumeConfig->get('queue', ['properties' => $queueProperties]), // Consume config,
108
            $messageConsumer->getQueue() ? $messageConsumer->getQueue()->getConfig()->toArray() : [], // Queue config
109
        );
110
111
        // Merge message queue configuration
112
        if ($messageConsumer->getQueue()) {
113
            $messageConsumer->getQueue()->setConfig($queueConfig);
114
            $messageConsumer->getQueue()->getConfig()->put('name', $messageConsumer->getQueue()->getName());
115
        } else {
116
            $messageConsumer->setQueue(new RabbitMQQueue($queueConfig['name'] ?? '', $queueConfig));
117
        }
118
119
        // Merge the consumer properties
120
        // Consume config > Connection config > Default config
121
        $consumerProperties = array_merge(
122
            $defaultConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Default properties
123
            $connectionConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Connection properties
124
            $consumeConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Consume properties
125
        );
126
127
        // Merge the consumer config
128
        // Consumer config > Consume config > Connection config > Default config
129
        $consumerConfig = array_merge(
130
            $defaultConfig->get('consumer', []), // Default config
131
            $connectionConfig->get('consumer', []), // Connection config
132
            $consumeConfig->get('consumer', ['properties' => $consumerProperties]), // Consume config,
133
            $messageConsumer->getConfig()->toArray(), // Consumer config
134
        );
135
136
        // Override consumer config with reconciled configuration
137
        $messageConsumer->setConfig($consumerConfig);
138
139
        // Consume config > Connection config > Default config
140
        $qosConfig = new Collection(array_merge(
141
            $defaultConfig->get('qos', []), // Default config
142
            $connectionConfig->get('qos', []), // Connection config
143
            $consumeConfig->get('qos', []), // Consume config,
144
        ));
145
146
        /* QoS is not attached to any exchange, queue */
147
        if ($qosConfig->get('enabled')) {
148
            $channel->basic_qos(
149
                $qosConfig->get('qos_prefetch_size'),
150
                $qosConfig->get('qos_prefetch_count'),
151
                $qosConfig->get('qos_a_global')
152
            );
153
        }
154
155
        $exchange = $messageConsumer->getExchange();
156
        $exchangeConfig = $exchange->getConfig();
157
158
        if ($exchangeConfig->get('declare')) {
159
            $channel->exchange_declare(
160
                $exchange->getName(),
161
                $exchangeConfig->get('type'),
162
                $exchangeConfig->get('passive', false),
163
                $exchangeConfig->get('durable', true),
164
                $exchangeConfig->get('auto_delete', false),
165
                $exchangeConfig->get('internal', false),
166
                $exchangeConfig->get('nowait', false),
167
                new AMQPTable($exchangeConfig->get('properties', []))
0 ignored issues
show
Bug introduced by
new PhpAmqpLib\Wire\AMQP...'properties', array())) of type PhpAmqpLib\Wire\AMQPTable is incompatible with the type array expected by parameter $arguments of PhpAmqpLib\Channel\AMQPChannel::exchange_declare(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

167
                /** @scrutinizer ignore-type */ new AMQPTable($exchangeConfig->get('properties', []))
Loading history...
168
            );
169
        }
170
171
        $queue = $messageConsumer->getQueue();
172
        $queueConfig = $queue->getConfig();
173
174
        if (empty($queueConfig->get('name')) || $queueConfig->get('declare')) {
175
            if (empty($queueConfig->get('name')) && empty($queue->getName())) {
176
                $qp['nowait'] = false;
0 ignored issues
show
Comprehensibility Best Practice introduced by
$qp was never initialized. Although not strictly required by PHP, it is generally a good practice to add $qp = array(); before regardless.
Loading history...
177
            }
178
179
            list($queueName, $messageCount, $consumerCount) = $channel->queue_declare(
180
                $queue->getName(),
181
                $queueConfig->get('passive', false),
182
                $queueConfig->get('durable', true),
183
                $queueConfig->get('exclusive', true),
184
                $queueConfig->get('auto_delete', false),
185
                $queueConfig->get('nowait', false),
186
                new AMQPTable($queueConfig->get('properties', [])),
187
                $queueConfig->get('ticket'),
188
            );
189
190
            // If the queue name was generated
191
            if ($queueName !== $queue->getName()) {
192
                $queue->setName($queueName);
193
                $queue->getConfig()->put('name', $queueName);
194
            }
195
196
            $queue->getConfig()->put('messageCount', $messageCount);
197
            $queue->getConfig()->put('consumerCount', $consumerCount);
198
        }
199
200
        // No queue can be bound to the default exchange.
201
        if ($exchange->getName()) {
202
            $channel->queue_bind(
203
                $queue->getName(),
204
                $exchange->getName(),
205
                $bindingKey,
206
                $queueConfig->get('nowait', false),
207
                new AMQPTable($queueConfig->get('properties.bind_properties', []))
0 ignored issues
show
Bug introduced by
new PhpAmqpLib\Wire\AMQP..._properties', array())) of type PhpAmqpLib\Wire\AMQPTable is incompatible with the type array expected by parameter $arguments of PhpAmqpLib\Channel\AMQPChannel::queue_bind(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

207
                /** @scrutinizer ignore-type */ new AMQPTable($queueConfig->get('properties.bind_properties', []))
Loading history...
208
            );
209
        }
210
211
        // Make an incoming message instance
212
        $message = new RabbitMQIncomingMessage();
213
214
        $message
215
            ->setExchange($messageConsumer->getExchange())
216
            ->setQueue($messageConsumer->getQueue());
217
218
        $callback = function (AMQPMessage $msg) use ($message, $messageConsumer) {
219
            // Prepare the message for consumption
220
            $message
221
                ->setStream($msg->body)
222
                ->setAmqpMessage($msg)
223
                ->setDelivery(
224
                    new RabbitMQDelivery([
225
                        'body' => $msg->body,
226
                        'delivery_info' => $msg->delivery_info,
227
                    ])
228
                );
229
            // Let the consumer handle the message
230
            $messageConsumer->handle($message);
231
        };
232
233
        $channel->basic_consume(
234
            $queue->getName(),
235
            $messageConsumer->getConfig()->get('tag', ''),
236
            $messageConsumer->getConfig()->get('no_local', false),
237
            $messageConsumer->getConfig()->get('no_ack', false),
238
            $messageConsumer->getConfig()->get('exclusive', false),
239
            $messageConsumer->getConfig()->get('nowait', false),
240
            $callback,
241
            $messageConsumer->getConfig()->get('ticket', null),
242
            $messageConsumer->getConfig()->get('parameters', [])
243
        );
244
245
        while ($channel->is_consuming()) {
246
            $channel->wait(
247
                $consumeConfig->get('wait_allowed_methods'),
248
                $consumeConfig->get('wait_non_blocking', false),
249
                $consumeConfig->get('wait_timeout'),
250
            );
251
        }
252
    }
253
}
254