RabbitMQConsumer   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 247
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 139
c 1
b 0
f 1
dl 0
loc 247
rs 10
wmc 17

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
F consume() 0 228 16
1
<?php
2
3
namespace Kunnu\RabbitMQ;
4
5
use PhpAmqpLib\Wire\AMQPTable;
6
use Illuminate\Support\Collection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
9
class RabbitMQConsumer
10
{
11
    /**
12
     * RabbitMQ Manager.
13
     *
14
     * @var RabbitMQManager
15
     */
16
    protected RabbitMQManager $manager;
17
18
    /**
19
     * Create a new RabbitMQ Publisher instance.
20
     *
21
     * @param  RabbitMQManager  $manager
22
     */
23
    public function __construct(RabbitMQManager $manager)
24
    {
25
        $this->manager = $manager;
26
    }
27
28
    public function consume(
29
        RabbitMQMessageConsumer $messageConsumer,
30
        string $bindingKey = '',
31
        string $connectionName = null,
32
        ConsumeConfig $consumeConfig = null
33
    ): void {
34
        $consumeConfig = $consumeConfig ?? new ConsumeConfig();
35
        $defaultConfig = new Collection($this->manager->getConfig()->get(RabbitMQManager::CONFIG_KEY . '.defaults'));
36
37
        $connectionName = $connectionName ?? $this->manager->resolveDefaultConfigName();
38
        $connection = $this->manager->resolveConnection();
39
40
        $channelId = $this->manager->resolveChannelId($consumeConfig->get('channel_id'), $connectionName);
41
        $channel = $this->manager->resolveChannel($connectionName, $channelId, $connection);
42
43
        $connectionConfig = $this->manager->resolveConfig($connectionName);
44
45
        // Merge/Override default connection configuration with
46
        // the configuration specified for this consuming.
47
        if ($consumeConfig->getConnectionConfig()) {
48
            // consume config > Connection config
49
            $connectionConfig = $connectionConfig->merge($consumeConfig->getConnectionConfig());
50
        }
51
52
        // Merge the exchange properties
53
        // Consume config > Connection config > Default config
54
        $exchangeProperties = array_merge(
55
            $defaultConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Default properties
56
            $connectionConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Connection properties
57
            $consumeConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Consume properties
58
        );
59
60
        // Merge the exchange config
61
        // Exchange config > Consume config > Connection config > Default config
62
        $exchangeConfig = array_merge(
63
            $defaultConfig->get('exchange', []), // Default config
64
            $connectionConfig->get('exchange', []), // Connection config
65
            $consumeConfig->get('exchange', ['properties' => $exchangeProperties]), // Consume config,
66
            $messageConsumer->getExchange() ? $messageConsumer->getExchange()->getConfig()->toArray() : [], // Exchange config
67
        );
68
69
        // Merge message exchange configuration
70
        if ($messageConsumer->getExchange()) {
71
            $messageConsumer->getExchange()->setConfig($exchangeConfig);
72
            $messageConsumer->getExchange()->getConfig()->put('name', $messageConsumer->getExchange()->getName());
0 ignored issues
show
Bug introduced by
'name' of type string is incompatible with the type Illuminate\Support\TKey expected by parameter $key of Illuminate\Support\Collection::put(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

72
            $messageConsumer->getExchange()->getConfig()->put(/** @scrutinizer ignore-type */ 'name', $messageConsumer->getExchange()->getName());
Loading history...
Bug introduced by
$messageConsumer->getExchange()->getName() of type string is incompatible with the type Illuminate\Support\TValue expected by parameter $value of Illuminate\Support\Collection::put(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

72
            $messageConsumer->getExchange()->getConfig()->put('name', /** @scrutinizer ignore-type */ $messageConsumer->getExchange()->getName());
Loading history...
73
        } else {
74
            $messageConsumer->setExchange(new RabbitMQExchange($exchangeConfig['name'] ?? '', $exchangeConfig));
75
        }
76
77
        // Merge the queue declare properties
78
        // Consume config > Connection config > Default config
79
        $queueDeclareProperties = array_merge(
80
            $defaultConfig->get('queue', ['declare_properties' => []])['declare_properties'] ?? [], // Default properties
81
            $connectionConfig->get(
82
                'queue',
83
                ['declare_properties' => []]
84
            )['declare_properties'] ?? [], // Connection properties
85
            $consumeConfig->get(
86
                'queue',
87
                ['declare_properties' => []]
88
            )['declare_properties'] ?? [], // Consume properties
89
        );
90
91
        // Merge the queue bind properties
92
        // Consume config > Connection config > Default config
93
        $queueBindProperties = array_merge(
94
            $defaultConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Default properties
95
            $connectionConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Connection properties
96
            $consumeConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Consume properties
97
        );
98
99
        // Merge the queue declare and bind properties
100
        $queueProperties = array_merge($queueDeclareProperties, $queueBindProperties);
101
102
        // Merge the queue config
103
        // Exchange config > Consume config > Connection config > Default config
104
        $queueConfig = array_merge(
105
            $defaultConfig->get('queue', []), // Default config
106
            $connectionConfig->get('queue', []), // Connection config
107
            $consumeConfig->get('queue', ['properties' => $queueProperties]), // Consume config,
108
            $messageConsumer->getQueue() ? $messageConsumer->getQueue()->getConfig()->toArray() : [], // Queue config
109
        );
110
111
        // Merge message queue configuration
112
        if ($messageConsumer->getQueue()) {
113
            $messageConsumer->getQueue()->setConfig($queueConfig);
114
            $messageConsumer->getQueue()->getConfig()->put('name', $messageConsumer->getQueue()->getName());
115
        } else {
116
            $messageConsumer->setQueue(new RabbitMQQueue($queueConfig['name'] ?? '', $queueConfig));
117
        }
118
119
        // Merge the consumer properties
120
        // Consume config > Connection config > Default config
121
        $consumerProperties = array_merge(
122
            $defaultConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Default properties
123
            $connectionConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Connection properties
124
            $consumeConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Consume properties
125
        );
126
127
        // Merge the consumer config
128
        // Consumer config > Consume config > Connection config > Default config
129
        $consumerConfig = new Collection(array_merge(
0 ignored issues
show
Bug introduced by
array_merge($defaultConf...getConfig()->toArray()) of type array is incompatible with the type Illuminate\Contracts\Support\Arrayable expected by parameter $items of Illuminate\Support\Collection::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

129
        $consumerConfig = new Collection(/** @scrutinizer ignore-type */ array_merge(
Loading history...
130
            $defaultConfig->get('consumer', []), // Default config
131
            $connectionConfig->get('consumer', []), // Connection config
132
            $consumeConfig->get('consumer', ['properties' => $consumerProperties]), // Consume config,
133
            $messageConsumer->getConfig()->toArray(), // Consumer config
134
        ));
135
136
        // Override consumer config with reconciled configuration
137
        $messageConsumer->setConfig($consumerConfig->toArray());
138
139
        // Consume config > Connection config > Default config
140
        $qosConfig = new Collection(array_merge(
141
            $defaultConfig->get('qos', []), // Default config
142
            $connectionConfig->get('qos', []), // Connection config
143
            $consumerConfig->get('qos', []), // Consume config,
144
        ));
145
146
        /* QoS is not attached to any exchange, queue */
147
        if ($qosConfig->get('enabled')) {
148
            $channel->basic_qos(
149
                $qosConfig->get('qos_prefetch_size'),
150
                $qosConfig->get('qos_prefetch_count'),
151
                $qosConfig->get('qos_a_global')
152
            );
153
        }
154
155
        $exchange = $messageConsumer->getExchange();
156
        $exchangeConfig = $exchange->getConfig();
157
158
        if ($exchangeConfig->get('declare')) {
159
            $channel->exchange_declare(
160
                $exchange->getName(),
161
                $exchangeConfig->get('type'),
162
                $exchangeConfig->get('passive', false),
163
                $exchangeConfig->get('durable', true),
164
                $exchangeConfig->get('auto_delete', false),
165
                $exchangeConfig->get('internal', false),
166
                $exchangeConfig->get('nowait', false),
167
                (new AMQPTable($exchangeConfig->get('properties', [])))->getNativeData()
168
            );
169
        }
170
171
        $queue = $messageConsumer->getQueue();
172
        $queueConfig = $queue->getConfig();
173
174
        if (empty($queueConfig->get('name')) || $queueConfig->get('declare')) {
175
            if (empty($queueConfig->get('name')) && empty($queue->getName())) {
176
                $queueConfig->put('nowait', false);
0 ignored issues
show
Bug introduced by
false of type false is incompatible with the type Illuminate\Support\TValue expected by parameter $value of Illuminate\Support\Collection::put(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

176
                $queueConfig->put('nowait', /** @scrutinizer ignore-type */ false);
Loading history...
177
            }
178
179
            [$queueName, $messageCount, $consumerCount] = $channel->queue_declare(
180
                $queue->getName(),
181
                $queueConfig->get('passive', false),
182
                $queueConfig->get('durable', true),
183
                $queueConfig->get('exclusive', true),
184
                $queueConfig->get('auto_delete', false),
185
                $queueConfig->get('nowait', false),
186
                new AMQPTable($queueConfig->get('properties', [])),
187
                $queueConfig->get('ticket'),
188
            );
189
190
            // If the queue name was generated
191
            if ($queueName !== $queue->getName()) {
192
                $queue->setName($queueName);
193
                $queue->getConfig()->put('name', $queueName);
194
            }
195
196
            $queue->getConfig()->put('messageCount', $messageCount);
197
            $queue->getConfig()->put('consumerCount', $consumerCount);
198
        }
199
200
        // No queue can be bound to the default exchange.
201
        if ($exchange->getName()) {
202
            $channel->queue_bind(
203
                $queue->getName(),
204
                $exchange->getName(),
205
                $bindingKey,
206
                $queueConfig->get('nowait', false),
207
                (new AMQPTable($queueConfig->get('properties.bind_properties', [])))->getNativeData()
208
            );
209
        }
210
211
        // Make an incoming message instance
212
        $message = new RabbitMQIncomingMessage();
213
214
        $message
215
            ->setExchange($messageConsumer->getExchange())
216
            ->setQueue($messageConsumer->getQueue());
217
218
        $callback = function (AMQPMessage $msg) use ($message, $messageConsumer) {
219
            // Prepare the message for consumption
220
            $message
221
                ->setStream($msg->body)
222
                ->setAmqpMessage($msg)
223
                ->setDelivery(
224
                    new RabbitMQDelivery([
225
                        'body' => $msg->body,
226
                        'delivery_info' => $msg->delivery_info,
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

226
                        'delivery_info' => /** @scrutinizer ignore-deprecated */ $msg->delivery_info,
Loading history...
227
                    ])
228
                );
229
            // Let the consumer handle the message
230
            $messageConsumer->handle($message);
231
        };
232
233
        $channel->basic_consume(
234
            $queue->getName(),
235
            $messageConsumer->getConfig()->get('tag', ''),
236
            $messageConsumer->getConfig()->get('no_local', false),
237
            $messageConsumer->getConfig()->get('no_ack', false),
238
            $messageConsumer->getConfig()->get('exclusive', false),
239
            $messageConsumer->getConfig()->get('nowait', false),
240
            $callback,
241
            $messageConsumer->getConfig()->get('ticket', null),
242
            $messageConsumer->getConfig()->get('parameters', [])
243
        );
244
245
        while ($channel->is_consuming()) {
246
            $channel->wait(
247
                $consumerConfig->get('wait_allowed_methods'),
248
                $consumerConfig->get('wait_non_blocking', false),
249
                $consumerConfig->get('wait_timeout'),
250
            );
251
252
            $consumerSleepMs = $consumerConfig->get('consumer_sleep_ms', 0);
253
254
            if ($consumerSleepMs > 0) {
255
                usleep($consumerSleepMs);
256
            }
257
        }
258
    }
259
}
260