1 | <?php |
||||
2 | |||||
3 | namespace Kunnu\RabbitMQ; |
||||
4 | |||||
5 | use PhpAmqpLib\Wire\AMQPTable; |
||||
6 | use Illuminate\Support\Collection; |
||||
7 | use PhpAmqpLib\Message\AMQPMessage; |
||||
8 | |||||
9 | class RabbitMQConsumer |
||||
10 | { |
||||
11 | /** |
||||
12 | * RabbitMQ Manager. |
||||
13 | * |
||||
14 | * @var RabbitMQManager |
||||
15 | */ |
||||
16 | protected RabbitMQManager $manager; |
||||
17 | |||||
18 | /** |
||||
19 | * Create a new RabbitMQ Publisher instance. |
||||
20 | * |
||||
21 | * @param RabbitMQManager $manager |
||||
22 | */ |
||||
23 | public function __construct(RabbitMQManager $manager) |
||||
24 | { |
||||
25 | $this->manager = $manager; |
||||
26 | } |
||||
27 | |||||
28 | public function consume( |
||||
29 | RabbitMQMessageConsumer $messageConsumer, |
||||
30 | string $bindingKey = '', |
||||
31 | string $connectionName = null, |
||||
32 | ConsumeConfig $consumeConfig = null |
||||
33 | ): void { |
||||
34 | $consumeConfig = $consumeConfig ?? new ConsumeConfig(); |
||||
35 | $defaultConfig = new Collection($this->manager->getConfig()->get(RabbitMQManager::CONFIG_KEY . '.defaults')); |
||||
36 | |||||
37 | $connectionName = $connectionName ?? $this->manager->resolveDefaultConfigName(); |
||||
38 | $connection = $this->manager->resolveConnection(); |
||||
39 | |||||
40 | $channelId = $this->manager->resolveChannelId($consumeConfig->get('channel_id'), $connectionName); |
||||
41 | $channel = $this->manager->resolveChannel($connectionName, $channelId, $connection); |
||||
42 | |||||
43 | $connectionConfig = $this->manager->resolveConfig($connectionName); |
||||
44 | |||||
45 | // Merge/Override default connection configuration with |
||||
46 | // the configuration specified for this consuming. |
||||
47 | if ($consumeConfig->getConnectionConfig()) { |
||||
48 | // consume config > Connection config |
||||
49 | $connectionConfig = $connectionConfig->merge($consumeConfig->getConnectionConfig()); |
||||
50 | } |
||||
51 | |||||
52 | // Merge the exchange properties |
||||
53 | // Consume config > Connection config > Default config |
||||
54 | $exchangeProperties = array_merge( |
||||
55 | $defaultConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Default properties |
||||
56 | $connectionConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Connection properties |
||||
57 | $consumeConfig->get('exchange', ['properties' => []])['properties'] ?? [], // Consume properties |
||||
58 | ); |
||||
59 | |||||
60 | // Merge the exchange config |
||||
61 | // Exchange config > Consume config > Connection config > Default config |
||||
62 | $exchangeConfig = array_merge( |
||||
63 | $defaultConfig->get('exchange', []), // Default config |
||||
64 | $connectionConfig->get('exchange', []), // Connection config |
||||
65 | $consumeConfig->get('exchange', ['properties' => $exchangeProperties]), // Consume config, |
||||
66 | $messageConsumer->getExchange() ? $messageConsumer->getExchange()->getConfig()->toArray() : [], // Exchange config |
||||
67 | ); |
||||
68 | |||||
69 | // Merge message exchange configuration |
||||
70 | if ($messageConsumer->getExchange()) { |
||||
71 | $messageConsumer->getExchange()->setConfig($exchangeConfig); |
||||
72 | $messageConsumer->getExchange()->getConfig()->put('name', $messageConsumer->getExchange()->getName()); |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() $messageConsumer->getExchange()->getName() of type string is incompatible with the type Illuminate\Support\TValue expected by parameter $value of Illuminate\Support\Collection::put() .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
73 | } else { |
||||
74 | $messageConsumer->setExchange(new RabbitMQExchange($exchangeConfig['name'] ?? '', $exchangeConfig)); |
||||
75 | } |
||||
76 | |||||
77 | // Merge the queue declare properties |
||||
78 | // Consume config > Connection config > Default config |
||||
79 | $queueDeclareProperties = array_merge( |
||||
80 | $defaultConfig->get('queue', ['declare_properties' => []])['declare_properties'] ?? [], // Default properties |
||||
81 | $connectionConfig->get( |
||||
82 | 'queue', |
||||
83 | ['declare_properties' => []] |
||||
84 | )['declare_properties'] ?? [], // Connection properties |
||||
85 | $consumeConfig->get( |
||||
86 | 'queue', |
||||
87 | ['declare_properties' => []] |
||||
88 | )['declare_properties'] ?? [], // Consume properties |
||||
89 | ); |
||||
90 | |||||
91 | // Merge the queue bind properties |
||||
92 | // Consume config > Connection config > Default config |
||||
93 | $queueBindProperties = array_merge( |
||||
94 | $defaultConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Default properties |
||||
95 | $connectionConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Connection properties |
||||
96 | $consumeConfig->get('queue', ['bind_properties' => []])['bind_properties'] ?? [], // Consume properties |
||||
97 | ); |
||||
98 | |||||
99 | // Merge the queue declare and bind properties |
||||
100 | $queueProperties = array_merge($queueDeclareProperties, $queueBindProperties); |
||||
101 | |||||
102 | // Merge the queue config |
||||
103 | // Exchange config > Consume config > Connection config > Default config |
||||
104 | $queueConfig = array_merge( |
||||
105 | $defaultConfig->get('queue', []), // Default config |
||||
106 | $connectionConfig->get('queue', []), // Connection config |
||||
107 | $consumeConfig->get('queue', ['properties' => $queueProperties]), // Consume config, |
||||
108 | $messageConsumer->getQueue() ? $messageConsumer->getQueue()->getConfig()->toArray() : [], // Queue config |
||||
109 | ); |
||||
110 | |||||
111 | // Merge message queue configuration |
||||
112 | if ($messageConsumer->getQueue()) { |
||||
113 | $messageConsumer->getQueue()->setConfig($queueConfig); |
||||
114 | $messageConsumer->getQueue()->getConfig()->put('name', $messageConsumer->getQueue()->getName()); |
||||
115 | } else { |
||||
116 | $messageConsumer->setQueue(new RabbitMQQueue($queueConfig['name'] ?? '', $queueConfig)); |
||||
117 | } |
||||
118 | |||||
119 | // Merge the consumer properties |
||||
120 | // Consume config > Connection config > Default config |
||||
121 | $consumerProperties = array_merge( |
||||
122 | $defaultConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Default properties |
||||
123 | $connectionConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Connection properties |
||||
124 | $consumeConfig->get('consumer', ['properties' => []])['properties'] ?? [], // Consume properties |
||||
125 | ); |
||||
126 | |||||
127 | // Merge the consumer config |
||||
128 | // Consumer config > Consume config > Connection config > Default config |
||||
129 | $consumerConfig = new Collection(array_merge( |
||||
0 ignored issues
–
show
array_merge($defaultConf...getConfig()->toArray()) of type array is incompatible with the type Illuminate\Contracts\Support\Arrayable expected by parameter $items of Illuminate\Support\Collection::__construct() .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
130 | $defaultConfig->get('consumer', []), // Default config |
||||
131 | $connectionConfig->get('consumer', []), // Connection config |
||||
132 | $consumeConfig->get('consumer', ['properties' => $consumerProperties]), // Consume config, |
||||
133 | $messageConsumer->getConfig()->toArray(), // Consumer config |
||||
134 | )); |
||||
135 | |||||
136 | // Override consumer config with reconciled configuration |
||||
137 | $messageConsumer->setConfig($consumerConfig->toArray()); |
||||
138 | |||||
139 | // Consume config > Connection config > Default config |
||||
140 | $qosConfig = new Collection(array_merge( |
||||
141 | $defaultConfig->get('qos', []), // Default config |
||||
142 | $connectionConfig->get('qos', []), // Connection config |
||||
143 | $consumerConfig->get('qos', []), // Consume config, |
||||
144 | )); |
||||
145 | |||||
146 | /* QoS is not attached to any exchange, queue */ |
||||
147 | if ($qosConfig->get('enabled')) { |
||||
148 | $channel->basic_qos( |
||||
149 | $qosConfig->get('qos_prefetch_size'), |
||||
150 | $qosConfig->get('qos_prefetch_count'), |
||||
151 | $qosConfig->get('qos_a_global') |
||||
152 | ); |
||||
153 | } |
||||
154 | |||||
155 | $exchange = $messageConsumer->getExchange(); |
||||
156 | $exchangeConfig = $exchange->getConfig(); |
||||
157 | |||||
158 | if ($exchangeConfig->get('declare')) { |
||||
159 | $channel->exchange_declare( |
||||
160 | $exchange->getName(), |
||||
161 | $exchangeConfig->get('type'), |
||||
162 | $exchangeConfig->get('passive', false), |
||||
163 | $exchangeConfig->get('durable', true), |
||||
164 | $exchangeConfig->get('auto_delete', false), |
||||
165 | $exchangeConfig->get('internal', false), |
||||
166 | $exchangeConfig->get('nowait', false), |
||||
167 | (new AMQPTable($exchangeConfig->get('properties', [])))->getNativeData() |
||||
168 | ); |
||||
169 | } |
||||
170 | |||||
171 | $queue = $messageConsumer->getQueue(); |
||||
172 | $queueConfig = $queue->getConfig(); |
||||
173 | |||||
174 | if (empty($queueConfig->get('name')) || $queueConfig->get('declare')) { |
||||
175 | if (empty($queueConfig->get('name')) && empty($queue->getName())) { |
||||
176 | $queueConfig->put('nowait', false); |
||||
0 ignored issues
–
show
false of type false is incompatible with the type Illuminate\Support\TValue expected by parameter $value of Illuminate\Support\Collection::put() .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
177 | } |
||||
178 | |||||
179 | [$queueName, $messageCount, $consumerCount] = $channel->queue_declare( |
||||
180 | $queue->getName(), |
||||
181 | $queueConfig->get('passive', false), |
||||
182 | $queueConfig->get('durable', true), |
||||
183 | $queueConfig->get('exclusive', true), |
||||
184 | $queueConfig->get('auto_delete', false), |
||||
185 | $queueConfig->get('nowait', false), |
||||
186 | new AMQPTable($queueConfig->get('properties', [])), |
||||
187 | $queueConfig->get('ticket'), |
||||
188 | ); |
||||
189 | |||||
190 | // If the queue name was generated |
||||
191 | if ($queueName !== $queue->getName()) { |
||||
192 | $queue->setName($queueName); |
||||
193 | $queue->getConfig()->put('name', $queueName); |
||||
194 | } |
||||
195 | |||||
196 | $queue->getConfig()->put('messageCount', $messageCount); |
||||
197 | $queue->getConfig()->put('consumerCount', $consumerCount); |
||||
198 | } |
||||
199 | |||||
200 | // No queue can be bound to the default exchange. |
||||
201 | if ($exchange->getName()) { |
||||
202 | $channel->queue_bind( |
||||
203 | $queue->getName(), |
||||
204 | $exchange->getName(), |
||||
205 | $bindingKey, |
||||
206 | $queueConfig->get('nowait', false), |
||||
207 | (new AMQPTable($queueConfig->get('properties.bind_properties', [])))->getNativeData() |
||||
208 | ); |
||||
209 | } |
||||
210 | |||||
211 | // Make an incoming message instance |
||||
212 | $message = new RabbitMQIncomingMessage(); |
||||
213 | |||||
214 | $message |
||||
215 | ->setExchange($messageConsumer->getExchange()) |
||||
216 | ->setQueue($messageConsumer->getQueue()); |
||||
217 | |||||
218 | $callback = function (AMQPMessage $msg) use ($message, $messageConsumer) { |
||||
219 | // Prepare the message for consumption |
||||
220 | $message |
||||
221 | ->setStream($msg->body) |
||||
222 | ->setAmqpMessage($msg) |
||||
223 | ->setDelivery( |
||||
224 | new RabbitMQDelivery([ |
||||
225 | 'body' => $msg->body, |
||||
226 | 'delivery_info' => $msg->delivery_info, |
||||
227 | ]) |
||||
228 | ); |
||||
229 | // Let the consumer handle the message |
||||
230 | $messageConsumer->handle($message); |
||||
231 | }; |
||||
232 | |||||
233 | $channel->basic_consume( |
||||
234 | $queue->getName(), |
||||
235 | $messageConsumer->getConfig()->get('tag', ''), |
||||
236 | $messageConsumer->getConfig()->get('no_local', false), |
||||
237 | $messageConsumer->getConfig()->get('no_ack', false), |
||||
238 | $messageConsumer->getConfig()->get('exclusive', false), |
||||
239 | $messageConsumer->getConfig()->get('nowait', false), |
||||
240 | $callback, |
||||
241 | $messageConsumer->getConfig()->get('ticket', null), |
||||
242 | $messageConsumer->getConfig()->get('parameters', []) |
||||
243 | ); |
||||
244 | |||||
245 | while ($channel->is_consuming()) { |
||||
246 | $channel->wait( |
||||
247 | $consumerConfig->get('wait_allowed_methods'), |
||||
248 | $consumerConfig->get('wait_non_blocking', false), |
||||
249 | $consumerConfig->get('wait_timeout'), |
||||
250 | ); |
||||
251 | |||||
252 | $consumerSleepMs = $consumerConfig->get('consumer_sleep_ms', 0); |
||||
253 | |||||
254 | if ($consumerSleepMs > 0) { |
||||
255 | usleep($consumerSleepMs); |
||||
256 | } |
||||
257 | } |
||||
258 | } |
||||
259 | } |
||||
260 |