1 | <?php |
||||
2 | |||||
3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||||
4 | |||||
5 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; |
||||
6 | use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; |
||||
7 | use Symfony\Component\Config\Definition\ConfigurationInterface; |
||||
8 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||||
9 | use Symfony\Component\Config\FileLocator; |
||||
10 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||||
11 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||||
12 | use Symfony\Component\DependencyInjection\Definition; |
||||
13 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||||
14 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||||
15 | use Symfony\Component\DependencyInjection\Reference; |
||||
16 | |||||
17 | /** |
||||
18 | * OldSoundRabbitMqExtension. |
||||
19 | * |
||||
20 | * @author Alvaro Videla |
||||
21 | * @author Marc Weistroff <[email protected]> |
||||
22 | */ |
||||
23 | class OldSoundRabbitMqExtension extends Extension |
||||
24 | { |
||||
25 | /** |
||||
26 | * @var ContainerBuilder |
||||
27 | */ |
||||
28 | private $container; |
||||
29 | |||||
30 | /** |
||||
31 | * @var Boolean Whether the data collector is enabled |
||||
32 | */ |
||||
33 | private $collectorEnabled; |
||||
34 | |||||
35 | private $channelIds = []; |
||||
36 | |||||
37 | private $config = []; |
||||
38 | |||||
39 | 34 | public function load(array $configs, ContainerBuilder $container): void |
|||
40 | { |
||||
41 | 34 | $this->container = $container; |
|||
42 | |||||
43 | 34 | $loader = new XmlFileLoader($this->container, new FileLocator([__DIR__ . '/../Resources/config'])); |
|||
44 | 34 | $loader->load('rabbitmq.xml'); |
|||
45 | |||||
46 | 34 | $configuration = $this->getConfiguration($configs, $container); |
|||
47 | 34 | $this->config = $this->processConfiguration($configuration, $configs); |
|||
48 | |||||
49 | 34 | $this->collectorEnabled = $this->config['enable_collector']; |
|||
50 | |||||
51 | 34 | $this->loadConnections(); |
|||
52 | 34 | $this->loadBindings(); |
|||
53 | 34 | $this->loadProducers(); |
|||
54 | 34 | $this->loadConsumers(); |
|||
55 | 34 | $this->loadMultipleConsumers(); |
|||
56 | 34 | $this->loadDynamicConsumers(); |
|||
57 | 34 | $this->loadBatchConsumers(); |
|||
58 | 34 | $this->loadAnonConsumers(); |
|||
59 | 34 | $this->loadRpcClients(); |
|||
60 | 34 | $this->loadRpcServers(); |
|||
61 | |||||
62 | 34 | if ($this->collectorEnabled && $this->channelIds) { |
|||
0 ignored issues
–
show
|
|||||
63 | 31 | $channels = []; |
|||
64 | 31 | foreach (array_unique($this->channelIds) as $id) { |
|||
65 | 31 | $channels[] = new Reference($id); |
|||
66 | } |
||||
67 | |||||
68 | 31 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|||
69 | 31 | $definition->replaceArgument(0, $channels); |
|||
70 | } else { |
||||
71 | 3 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
|||
72 | } |
||||
73 | 34 | } |
|||
74 | |||||
75 | 34 | public function getConfiguration(array $config, ContainerBuilder $container): ?ConfigurationInterface |
|||
76 | { |
||||
77 | 34 | return new Configuration($this->getAlias()); |
|||
78 | } |
||||
79 | |||||
80 | 34 | protected function loadConnections() |
|||
81 | { |
||||
82 | 34 | foreach ($this->config['connections'] as $key => $connection) { |
|||
83 | 34 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|||
84 | 34 | $classParam = |
|||
85 | 34 | $connection['lazy'] |
|||
86 | 25 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
|||
87 | 34 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
|||
88 | |||||
89 | 34 | $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', [ |
|||
90 | 34 | $classParam, $connection, |
|||
91 | ]); |
||||
92 | 34 | if (isset($connection['connection_parameters_provider'])) { |
|||
93 | $definition->addArgument(new Reference($connection['connection_parameters_provider'])); |
||||
94 | unset($connection['connection_parameters_provider']); |
||||
95 | } |
||||
96 | 34 | $definition->setPublic(false); |
|||
97 | 34 | $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key); |
|||
98 | 34 | $this->container->setDefinition($factoryName, $definition); |
|||
99 | |||||
100 | 34 | $definition = new Definition($classParam); |
|||
101 | 34 | if (method_exists($definition, 'setFactory')) { |
|||
102 | // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||
103 | 34 | $definition->setFactory([new Reference($factoryName), 'createConnection']); |
|||
104 | } else { |
||||
105 | // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||
106 | $definition->setFactoryService($factoryName); |
||||
107 | $definition->setFactoryMethod('createConnection'); |
||||
108 | } |
||||
109 | 34 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|||
110 | 34 | $definition->setPublic(true); |
|||
111 | |||||
112 | 34 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition); |
|||
113 | } |
||||
114 | 34 | } |
|||
115 | |||||
116 | 34 | protected function loadBindings() |
|||
117 | { |
||||
118 | 34 | if ($this->config['sandbox']) { |
|||
119 | return; |
||||
120 | } |
||||
121 | 34 | foreach ($this->config['bindings'] as $binding) { |
|||
122 | 25 | ksort($binding); |
|||
123 | 25 | $definition = new Definition($binding['class']); |
|||
124 | 25 | $definition->addTag('old_sound_rabbit_mq.binding'); |
|||
125 | 25 | $definition->addMethodCall('setArguments', [$binding['arguments']]); |
|||
126 | 25 | $definition->addMethodCall('setDestination', [$binding['destination']]); |
|||
127 | 25 | $definition->addMethodCall('setDestinationIsExchange', [$binding['destination_is_exchange']]); |
|||
128 | 25 | $definition->addMethodCall('setExchange', [$binding['exchange']]); |
|||
129 | 25 | $definition->addMethodCall('isNowait', [$binding['nowait']]); |
|||
130 | 25 | $definition->addMethodCall('setRoutingKey', [$binding['routing_key']]); |
|||
131 | 25 | $this->injectConnection($definition, $binding['connection']); |
|||
132 | 25 | $key = md5(json_encode($binding)); |
|||
133 | 25 | if ($this->collectorEnabled) { |
|||
134 | // in the context of a binding, I don't thing logged channels are needed? |
||||
135 | 25 | $this->injectLoggedChannel($definition, $key, $binding['connection']); |
|||
136 | } |
||||
137 | 25 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition); |
|||
138 | } |
||||
139 | 34 | } |
|||
140 | |||||
141 | 34 | protected function loadProducers() |
|||
142 | { |
||||
143 | 34 | if ($this->config['sandbox'] == false) { |
|||
144 | 34 | foreach ($this->config['producers'] as $key => $producer) { |
|||
145 | 30 | $definition = new Definition($producer['class']); |
|||
146 | 30 | $definition->setPublic(true); |
|||
147 | 30 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||
148 | 30 | $definition->addTag('old_sound_rabbit_mq.producer'); |
|||
149 | //this producer doesn't define an exchange -> using AMQP Default |
||||
150 | 30 | if (!isset($producer['exchange_options'])) { |
|||
151 | 1 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
|||
152 | } |
||||
153 | 30 | $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($producer['exchange_options'])]); |
|||
154 | //this producer doesn't define a queue -> using AMQP Default |
||||
155 | 30 | if (!isset($producer['queue_options'])) { |
|||
156 | 30 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
|||
157 | } |
||||
158 | 30 | $definition->addMethodCall('setQueueOptions', [$producer['queue_options']]); |
|||
159 | 30 | $this->injectConnection($definition, $producer['connection']); |
|||
160 | 30 | if ($this->collectorEnabled) { |
|||
161 | 28 | $this->injectLoggedChannel($definition, $key, $producer['connection']); |
|||
162 | } |
||||
163 | 30 | if (!$producer['auto_setup_fabric']) { |
|||
164 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
165 | } |
||||
166 | |||||
167 | 30 | if ($producer['enable_logger']) { |
|||
168 | $this->injectLogger($definition); |
||||
169 | } |
||||
170 | |||||
171 | 30 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
|||
172 | |||||
173 | 30 | $this->container->setDefinition($producerServiceName, $definition); |
|||
174 | 30 | if (null !== $producer['service_alias']) { |
|||
175 | 25 | $this->container->setAlias($producer['service_alias'], $producerServiceName); |
|||
176 | } |
||||
177 | |||||
178 | // register alias for argument auto wiring |
||||
179 | 30 | if (method_exists($this->container, 'registerAliasForArgument')) { |
|||
180 | 30 | $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key; |
|||
181 | 30 | $this->container |
|||
182 | 30 | ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName) |
|||
183 | 30 | ->setPublic(false); |
|||
184 | |||||
185 | 30 | $this->container |
|||
186 | 30 | ->registerAliasForArgument($producerServiceName, $producer['class'], $argName) |
|||
187 | 30 | ->setPublic(false); |
|||
188 | } |
||||
189 | |||||
190 | 30 | $definition->addMethodCall('setDefaultRoutingKey', [$producer['default_routing_key']]); |
|||
191 | 30 | $definition->addMethodCall('setContentType', [$producer['default_content_type']]); |
|||
192 | 30 | $definition->addMethodCall('setDeliveryMode', [$producer['default_delivery_mode']]); |
|||
193 | } |
||||
194 | } else { |
||||
195 | foreach ($this->config['producers'] as $key => $producer) { |
||||
196 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||||
197 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
||||
198 | $this->container->setDefinition($producerServiceName, $definition); |
||||
199 | |||||
200 | // register alias for argumen auto wiring |
||||
201 | if (method_exists($this->container, 'registerAliasForArgument')) { |
||||
202 | $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key; |
||||
203 | $this->container |
||||
204 | ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName) |
||||
205 | ->setPublic(false); |
||||
206 | } |
||||
207 | } |
||||
208 | } |
||||
209 | 34 | } |
|||
210 | |||||
211 | 34 | protected function loadConsumers() |
|||
212 | { |
||||
213 | 34 | foreach ($this->config['consumers'] as $key => $consumer) { |
|||
214 | 29 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); |
|||
215 | 29 | $definition->setPublic(true); |
|||
216 | 29 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||
217 | 29 | $definition->addTag('old_sound_rabbit_mq.consumer'); |
|||
218 | //this consumer doesn't define an exchange -> using AMQP Default |
||||
219 | 29 | if (!isset($consumer['exchange_options'])) { |
|||
220 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||
221 | } |
||||
222 | 29 | $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]); |
|||
223 | //this consumer doesn't define a queue -> using AMQP Default |
||||
224 | 29 | if (!isset($consumer['queue_options'])) { |
|||
225 | $consumer['queue_options'] = $this->getDefaultQueueOptions(); |
||||
226 | } |
||||
227 | 29 | $definition->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])]); |
|||
228 | 29 | $definition->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]); |
|||
229 | |||||
230 | 29 | if (array_key_exists('qos_options', $consumer)) { |
|||
231 | 25 | $definition->addMethodCall('setQosOptions', [ |
|||
232 | 25 | $consumer['qos_options']['prefetch_size'], |
|||
233 | 25 | $consumer['qos_options']['prefetch_count'], |
|||
234 | 25 | $consumer['qos_options']['global'], |
|||
235 | ]); |
||||
236 | } |
||||
237 | |||||
238 | 29 | if (isset($consumer['idle_timeout'])) { |
|||
239 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
||||
240 | } |
||||
241 | 29 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
242 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||
243 | } |
||||
244 | 29 | if (isset($consumer['timeout_wait'])) { |
|||
245 | 25 | $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]); |
|||
246 | } |
||||
247 | 29 | if (isset($consumer['graceful_max_execution'])) { |
|||
248 | $definition->addMethodCall( |
||||
249 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
250 | [$consumer['graceful_max_execution']['timeout']] |
||||
251 | ); |
||||
252 | $definition->addMethodCall( |
||||
253 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
254 | [$consumer['graceful_max_execution']['exit_code']] |
||||
255 | ); |
||||
256 | } |
||||
257 | 29 | if (!$consumer['auto_setup_fabric']) { |
|||
258 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
259 | } |
||||
260 | if (isset($consumer['options'])) { |
||||
261 | 29 | $definition->addMethodCall( |
|||
262 | 29 | 'setConsumerOptions', |
|||
263 | 28 | [$this->normalizeArgumentKeys($consumer['options'])] |
|||
264 | ); |
||||
265 | } |
||||
266 | 29 | ||||
267 | 1 | $this->injectConnection($definition, $consumer['connection']); |
|||
268 | if ($this->collectorEnabled) { |
||||
269 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||
270 | 29 | } |
|||
271 | 29 | ||||
272 | 29 | if ($consumer['enable_logger']) { |
|||
273 | $this->injectLogger($definition); |
||||
274 | } |
||||
275 | 29 | ||||
276 | 29 | $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); |
|||
277 | 29 | $this->container->setDefinition($name, $definition); |
|||
278 | 29 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||
279 | 29 | ||||
280 | // register alias for argument auto wiring |
||||
281 | 29 | if (method_exists($this->container, 'registerAliasForArgument')) { |
|||
282 | 29 | $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key; |
|||
283 | 29 | $this->container |
|||
284 | ->registerAliasForArgument($name, ConsumerInterface::class, $argName) |
||||
285 | ->setPublic(false); |
||||
286 | 34 | ||||
287 | $this->container |
||||
288 | 34 | ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName) |
|||
289 | ->setPublic(false); |
||||
290 | 34 | } |
|||
291 | 25 | } |
|||
292 | 25 | } |
|||
293 | |||||
294 | 25 | protected function loadMultipleConsumers() |
|||
295 | { |
||||
296 | foreach ($this->config['multiple_consumers'] as $key => $consumer) { |
||||
297 | $queues = []; |
||||
298 | $callbacks = []; |
||||
299 | |||||
300 | if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { |
||||
301 | 25 | throw new InvalidConfigurationException( |
|||
302 | 25 | "Error on loading $key multiple consumer. " . |
|||
303 | 25 | "Either 'queues' or 'queues_provider' parameters should be defined." |
|||
304 | 25 | ); |
|||
305 | } |
||||
306 | |||||
307 | 25 | foreach ($consumer['queues'] as $queueName => $queueOptions) { |
|||
308 | $queues[$queueOptions['name']] = $queueOptions; |
||||
309 | 25 | $queues[$queueOptions['name']]['callback'] = [new Reference($queueOptions['callback']), 'execute']; |
|||
310 | 25 | $callbacks[] = $queueOptions['callback']; |
|||
311 | 25 | } |
|||
312 | 25 | ||||
313 | 25 | $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); |
|||
314 | $definition |
||||
315 | 25 | ->setPublic(true) |
|||
316 | 25 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
317 | 25 | ->addTag('old_sound_rabbit_mq.multi_consumer') |
|||
318 | 25 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]) |
|||
319 | ->addMethodCall('setQueues', [$this->normalizeArgumentKeys($queues)]); |
||||
320 | |||||
321 | if ($consumer['queues_provider']) { |
||||
322 | 25 | $definition->addMethodCall( |
|||
323 | 'setQueuesProvider', |
||||
324 | [new Reference($consumer['queues_provider'])] |
||||
325 | ); |
||||
326 | } |
||||
327 | |||||
328 | if (array_key_exists('qos_options', $consumer)) { |
||||
329 | $definition->addMethodCall('setQosOptions', [ |
||||
330 | 25 | $consumer['qos_options']['prefetch_size'], |
|||
331 | $consumer['qos_options']['prefetch_count'], |
||||
332 | $consumer['qos_options']['global'], |
||||
333 | 25 | ]); |
|||
334 | } |
||||
335 | |||||
336 | 25 | if (isset($consumer['idle_timeout'])) { |
|||
337 | 25 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
|||
338 | } |
||||
339 | 25 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
340 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||
341 | } |
||||
342 | if (isset($consumer['timeout_wait'])) { |
||||
343 | $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]); |
||||
344 | } |
||||
345 | if (isset($consumer['graceful_max_execution'])) { |
||||
346 | $definition->addMethodCall( |
||||
347 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
348 | [$consumer['graceful_max_execution']['timeout']] |
||||
349 | 25 | ); |
|||
350 | $definition->addMethodCall( |
||||
351 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
352 | [$consumer['graceful_max_execution']['exit_code']] |
||||
353 | 25 | ); |
|||
354 | 25 | } |
|||
355 | 25 | if (!$consumer['auto_setup_fabric']) { |
|||
356 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
357 | } |
||||
358 | 25 | if (isset($consumer['options'])) { |
|||
359 | $definition->addMethodCall( |
||||
360 | 'setConsumerOptions', |
||||
361 | [$this->normalizeArgumentKeys($consumer['options'])] |
||||
362 | 25 | ); |
|||
363 | 25 | } |
|||
364 | 25 | ||||
365 | 25 | $this->injectConnection($definition, $consumer['connection']); |
|||
366 | if ($this->collectorEnabled) { |
||||
367 | 25 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||
368 | 25 | } |
|||
369 | |||||
370 | if ($consumer['enable_logger']) { |
||||
371 | 34 | $this->injectLogger($definition); |
|||
372 | } |
||||
373 | 34 | ||||
374 | $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); |
||||
375 | 34 | $this->container->setDefinition($name, $definition); |
|||
376 | 25 | if ($consumer['queues_provider']) { |
|||
377 | $this->addDequeuerAwareCall($consumer['queues_provider'], $name); |
||||
378 | } |
||||
379 | foreach ($callbacks as $callback) { |
||||
380 | $this->addDequeuerAwareCall($callback, $name); |
||||
381 | } |
||||
382 | } |
||||
383 | 25 | } |
|||
384 | |||||
385 | 25 | protected function loadDynamicConsumers() |
|||
386 | 25 | { |
|||
387 | 25 | foreach ($this->config['dynamic_consumers'] as $key => $consumer) { |
|||
388 | 25 | if (empty($consumer['queue_options_provider'])) { |
|||
389 | 25 | throw new InvalidConfigurationException( |
|||
390 | 25 | "Error on loading $key dynamic consumer. " . |
|||
391 | "'queue_provider' parameter should be defined." |
||||
392 | 25 | ); |
|||
393 | } |
||||
394 | |||||
395 | $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); |
||||
396 | $definition |
||||
397 | ->setPublic(true) |
||||
398 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||
399 | ->addTag('old_sound_rabbit_mq.consumer') |
||||
400 | 25 | ->addTag('old_sound_rabbit_mq.dynamic_consumer') |
|||
401 | 25 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]) |
|||
402 | 25 | ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]); |
|||
403 | |||||
404 | if (array_key_exists('qos_options', $consumer)) { |
||||
405 | 25 | $definition->addMethodCall('setQosOptions', [ |
|||
406 | $consumer['qos_options']['prefetch_size'], |
||||
407 | $consumer['qos_options']['prefetch_count'], |
||||
408 | 25 | $consumer['qos_options']['global'], |
|||
409 | ]); |
||||
410 | } |
||||
411 | 25 | ||||
412 | $definition->addMethodCall( |
||||
413 | 'setQueueOptionsProvider', |
||||
414 | 25 | [new Reference($consumer['queue_options_provider'])] |
|||
415 | ); |
||||
416 | |||||
417 | if (isset($consumer['idle_timeout'])) { |
||||
418 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
||||
419 | } |
||||
420 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||
421 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||
422 | } |
||||
423 | if (isset($consumer['timeout_wait'])) { |
||||
424 | 25 | $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]); |
|||
425 | } |
||||
426 | if (isset($consumer['graceful_max_execution'])) { |
||||
427 | $definition->addMethodCall( |
||||
428 | 25 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
|||
429 | 25 | [$consumer['graceful_max_execution']['timeout']] |
|||
430 | 25 | ); |
|||
431 | $definition->addMethodCall( |
||||
432 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
433 | 25 | [$consumer['graceful_max_execution']['exit_code']] |
|||
434 | ); |
||||
435 | } |
||||
436 | if (!$consumer['auto_setup_fabric']) { |
||||
437 | 25 | $definition->addMethodCall('disableAutoSetupFabric'); |
|||
438 | 25 | } |
|||
439 | 25 | if (isset($consumer['options'])) { |
|||
440 | 25 | $definition->addMethodCall( |
|||
441 | 'setConsumerOptions', |
||||
442 | 34 | [$this->normalizeArgumentKeys($consumer['options'])] |
|||
443 | ); |
||||
444 | 34 | } |
|||
445 | |||||
446 | 34 | $this->injectConnection($definition, $consumer['connection']); |
|||
447 | if ($this->collectorEnabled) { |
||||
448 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||
449 | } |
||||
450 | |||||
451 | if ($consumer['enable_logger']) { |
||||
452 | $this->injectLogger($definition); |
||||
453 | } |
||||
454 | |||||
455 | $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key); |
||||
456 | $this->container->setDefinition($name, $definition); |
||||
457 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
||||
458 | $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name); |
||||
459 | } |
||||
460 | } |
||||
461 | |||||
462 | protected function loadBatchConsumers() |
||||
463 | { |
||||
464 | foreach ($this->config['batch_consumers'] as $key => $consumer) { |
||||
465 | $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); |
||||
466 | |||||
467 | if (!isset($consumer['exchange_options'])) { |
||||
468 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||
469 | } |
||||
470 | |||||
471 | $definition |
||||
472 | ->setPublic(true) |
||||
473 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||
474 | ->addTag('old_sound_rabbit_mq.batch_consumer') |
||||
475 | ->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]) |
||||
476 | ->addMethodCall('setPrefetchCount', [$consumer['qos_options']['prefetch_count']]) |
||||
477 | ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'batchExecute']]) |
||||
478 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]) |
||||
479 | ->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])]) |
||||
480 | ->addMethodCall('setQosOptions', [ |
||||
481 | $consumer['qos_options']['prefetch_size'], |
||||
482 | $consumer['qos_options']['prefetch_count'], |
||||
483 | $consumer['qos_options']['global'], |
||||
484 | ]) |
||||
485 | ; |
||||
486 | |||||
487 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||
488 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||
489 | } |
||||
490 | |||||
491 | if (isset($consumer['idle_timeout'])) { |
||||
492 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
||||
493 | } |
||||
494 | |||||
495 | if (isset($consumer['graceful_max_execution'])) { |
||||
496 | $definition->addMethodCall( |
||||
497 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
498 | [$consumer['graceful_max_execution']['timeout']] |
||||
499 | ); |
||||
500 | } |
||||
501 | |||||
502 | if (!$consumer['auto_setup_fabric']) { |
||||
503 | 34 | $definition->addMethodCall('disableAutoSetupFabric'); |
|||
504 | } |
||||
505 | 34 | ||||
506 | if (isset($consumer['options'])) { |
||||
507 | 34 | $definition->addMethodCall( |
|||
508 | 25 | 'setConsumerOptions', |
|||
509 | [$this->normalizeArgumentKeys($consumer['options'])] |
||||
510 | 25 | ); |
|||
511 | 25 | } |
|||
512 | 25 | ||||
513 | 25 | if ($consumer['keep_alive']) { |
|||
514 | 25 | $definition->addMethodCall('keepAlive'); |
|||
515 | 25 | } |
|||
516 | 25 | ||||
517 | 25 | $this->injectConnection($definition, $consumer['connection']); |
|||
518 | if ($this->collectorEnabled) { |
||||
519 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||
520 | 25 | } |
|||
521 | 25 | ||||
522 | 25 | if ($consumer['enable_logger']) { |
|||
523 | $this->injectLogger($definition); |
||||
524 | 34 | } |
|||
525 | |||||
526 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); |
||||
527 | } |
||||
528 | } |
||||
529 | |||||
530 | protected function loadAnonConsumers() |
||||
531 | { |
||||
532 | foreach ($this->config['anon_consumers'] as $key => $anon) { |
||||
533 | $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); |
||||
534 | 30 | $definition |
|||
535 | ->setPublic(true) |
||||
536 | 30 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
537 | 1 | ->addTag('old_sound_rabbit_mq.anon_consumer') |
|||
538 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($anon['exchange_options'])]) |
||||
539 | 1 | ->addMethodCall('setCallback', [[new Reference($anon['callback']), 'execute']]); |
|||
540 | |||||
541 | if (isset($anon['options'])) { |
||||
542 | $definition->addMethodCall( |
||||
543 | 1 | 'setConsumerOptions', |
|||
544 | 1 | [$this->normalizeArgumentKeys($anon['options'])] |
|||
545 | 1 | ); |
|||
546 | } |
||||
547 | |||||
548 | 1 | $this->injectConnection($definition, $anon['connection']); |
|||
549 | if ($this->collectorEnabled) { |
||||
550 | 1 | $this->injectLoggedChannel($definition, $key, $anon['connection']); |
|||
551 | } |
||||
552 | 30 | ||||
553 | $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); |
||||
554 | $this->container->setDefinition($name, $definition); |
||||
555 | $this->addDequeuerAwareCall($anon['callback'], $name); |
||||
556 | } |
||||
557 | } |
||||
558 | |||||
559 | /** |
||||
560 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||||
561 | * parameter. So we revert the change for right configurations. |
||||
562 | * |
||||
563 | * @param array $config |
||||
564 | * |
||||
565 | * @return array |
||||
566 | */ |
||||
567 | private function normalizeArgumentKeys(array $config): array |
||||
568 | { |
||||
569 | if (isset($config['arguments'])) { |
||||
570 | $arguments = $config['arguments']; |
||||
571 | // support for old configuration |
||||
572 | if (is_string($arguments)) { |
||||
573 | $arguments = $this->argumentsStringAsArray($arguments); |
||||
0 ignored issues
–
show
The function
OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
574 | } |
||||
575 | |||||
576 | $newArguments = []; |
||||
577 | foreach ($arguments as $key => $value) { |
||||
578 | if (strstr($key, '_')) { |
||||
579 | 34 | $key = str_replace('_', '-', $key); |
|||
580 | } |
||||
581 | 34 | $newArguments[$key] = $value; |
|||
582 | 28 | } |
|||
583 | 28 | $config['arguments'] = $newArguments; |
|||
584 | } |
||||
585 | 28 | return $config; |
|||
586 | 28 | } |
|||
587 | 28 | ||||
588 | 28 | /** |
|||
589 | 28 | * Support for arguments provided as string. Support for old configuration files. |
|||
590 | * |
||||
591 | 28 | * @deprecated |
|||
592 | 28 | * @param string $arguments |
|||
593 | * @return array |
||||
594 | 28 | */ |
|||
595 | 28 | private function argumentsStringAsArray($arguments): array |
|||
596 | { |
||||
597 | 28 | $argumentsArray = []; |
|||
598 | |||||
599 | 28 | $argumentPairs = explode(',', $arguments); |
|||
600 | foreach ($argumentPairs as $argument) { |
||||
601 | 34 | $argumentPair = explode(':', $argument); |
|||
602 | $type = 'S'; |
||||
603 | 34 | if (isset($argumentPair[2])) { |
|||
604 | $type = $argumentPair[2]; |
||||
605 | 34 | } |
|||
606 | 25 | $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]]; |
|||
607 | } |
||||
608 | 25 | ||||
609 | 25 | return $argumentsArray; |
|||
610 | 25 | } |
|||
611 | 25 | ||||
612 | 25 | protected function loadRpcClients() |
|||
613 | 25 | { |
|||
614 | 25 | foreach ($this->config['rpc_clients'] as $key => $client) { |
|||
615 | 25 | $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); |
|||
616 | $definition->setLazy($client['lazy']); |
||||
617 | 25 | $definition |
|||
618 | ->addTag('old_sound_rabbit_mq.rpc_client') |
||||
619 | ->addMethodCall('initClient', [$client['expect_serialized_response']]); |
||||
620 | $this->injectConnection($definition, $client['connection']); |
||||
621 | if ($this->collectorEnabled) { |
||||
622 | $this->injectLoggedChannel($definition, $key, $client['connection']); |
||||
623 | } |
||||
624 | 25 | if (array_key_exists('unserializer', $client)) { |
|||
625 | 25 | $definition->addMethodCall('setUnserializer', [$client['unserializer']]); |
|||
626 | } |
||||
627 | 25 | if (array_key_exists('direct_reply_to', $client)) { |
|||
628 | 25 | $definition->addMethodCall('setDirectReplyTo', [$client['direct_reply_to']]); |
|||
629 | } |
||||
630 | 25 | $definition->setPublic(true); |
|||
631 | 25 | ||||
632 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition); |
||||
633 | 25 | } |
|||
634 | } |
||||
635 | 34 | ||||
636 | protected function loadRpcServers() |
||||
637 | 31 | { |
|||
638 | foreach ($this->config['rpc_servers'] as $key => $server) { |
||||
639 | 31 | $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); |
|||
640 | 31 | $definition |
|||
641 | ->setPublic(true) |
||||
642 | 31 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
643 | 31 | ->addTag('old_sound_rabbit_mq.rpc_server') |
|||
644 | 31 | ->addMethodCall('initServer', [$key]) |
|||
645 | ->addMethodCall('setCallback', [[new Reference($server['callback']), 'execute']]); |
||||
646 | 31 | $this->injectConnection($definition, $server['connection']); |
|||
647 | if ($this->collectorEnabled) { |
||||
648 | 31 | $this->injectLoggedChannel($definition, $key, $server['connection']); |
|||
649 | 31 | } |
|||
650 | 31 | if (array_key_exists('qos_options', $server)) { |
|||
651 | $definition->addMethodCall('setQosOptions', [ |
||||
652 | 33 | $server['qos_options']['prefetch_size'], |
|||
653 | $server['qos_options']['prefetch_count'], |
||||
654 | 33 | $server['qos_options']['global'], |
|||
655 | 33 | ]); |
|||
656 | } |
||||
657 | 34 | if (array_key_exists('exchange_options', $server)) { |
|||
658 | $definition->addMethodCall('setExchangeOptions', [$server['exchange_options']]); |
||||
659 | 34 | } |
|||
660 | if (array_key_exists('queue_options', $server)) { |
||||
661 | $definition->addMethodCall('setQueueOptions', [$server['queue_options']]); |
||||
662 | } |
||||
663 | if (array_key_exists('serializer', $server)) { |
||||
664 | $definition->addMethodCall('setSerializer', [$server['serializer']]); |
||||
665 | } |
||||
666 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition); |
||||
667 | } |
||||
668 | } |
||||
669 | 29 | ||||
670 | protected function injectLoggedChannel(Definition $definition, $name, $connectionName) |
||||
671 | 29 | { |
|||
672 | 29 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
|||
673 | $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); |
||||
674 | $channel |
||||
675 | ->setPublic(false) |
||||
676 | ->addTag('old_sound_rabbit_mq.logged_channel'); |
||||
677 | $this->injectConnection($channel, $connectionName); |
||||
678 | |||||
679 | $this->container->setDefinition($id, $channel); |
||||
680 | |||||
681 | $this->channelIds[] = $id; |
||||
682 | 1 | $definition->addArgument(new Reference($id)); |
|||
683 | } |
||||
684 | 1 | ||||
685 | 1 | protected function injectConnection(Definition $definition, $connectionName) |
|||
686 | { |
||||
687 | 1 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
|||
688 | 1 | } |
|||
689 | |||||
690 | public function getAlias(): string |
||||
691 | { |
||||
692 | return 'old_sound_rabbit_mq'; |
||||
693 | } |
||||
694 | |||||
695 | 1 | /** |
|||
696 | * Add proper dequeuer aware call |
||||
697 | * |
||||
698 | 1 | * @param string $callback |
|||
699 | * @param string $name |
||||
700 | * @throws \ReflectionException |
||||
701 | */ |
||||
702 | protected function addDequeuerAwareCall($callback, $name) |
||||
703 | { |
||||
704 | if (!$this->container->has($callback)) { |
||||
705 | return; |
||||
706 | } |
||||
707 | |||||
708 | $callbackDefinition = $this->container->findDefinition($callback); |
||||
709 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||||
710 | 30 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
|||
711 | $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]); |
||||
712 | } |
||||
713 | 30 | } |
|||
714 | |||||
715 | private function injectLogger(Definition $definition) |
||||
716 | { |
||||
717 | $definition->addTag('monolog.logger', [ |
||||
718 | 'channel' => 'phpamqplib', |
||||
719 | ]); |
||||
720 | $definition->addMethodCall('setLogger', [new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]); |
||||
721 | } |
||||
722 | |||||
723 | /** |
||||
724 | * Get default AMQP exchange options |
||||
725 | * |
||||
726 | * @return array |
||||
727 | */ |
||||
728 | protected function getDefaultExchangeOptions(): array |
||||
729 | { |
||||
730 | return [ |
||||
731 | 'name' => '', |
||||
732 | 'type' => 'direct', |
||||
733 | 'passive' => true, |
||||
734 | 'declare' => false, |
||||
735 | ]; |
||||
736 | } |
||||
737 | |||||
738 | /** |
||||
739 | * Get default AMQP queue options |
||||
740 | * |
||||
741 | * @return array |
||||
742 | */ |
||||
743 | protected function getDefaultQueueOptions(): array |
||||
744 | { |
||||
745 | return [ |
||||
746 | 'name' => '', |
||||
747 | 'declare' => false, |
||||
748 | ]; |
||||
749 | } |
||||
750 | } |
||||
751 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.