1 | <?php |
||||||
2 | |||||||
3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||||||
4 | |||||||
5 | use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; |
||||||
6 | use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; |
||||||
7 | use Symfony\Component\Config\Definition\ConfigurationInterface; |
||||||
8 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||||||
9 | use Symfony\Component\Config\FileLocator; |
||||||
10 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||||||
11 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||||||
12 | use Symfony\Component\DependencyInjection\Definition; |
||||||
13 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||||||
14 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||||||
15 | use Symfony\Component\DependencyInjection\Reference; |
||||||
16 | |||||||
17 | /** |
||||||
18 | * OldSoundRabbitMqExtension. |
||||||
19 | * |
||||||
20 | * @author Alvaro Videla |
||||||
21 | * @author Marc Weistroff <[email protected]> |
||||||
22 | */ |
||||||
23 | class OldSoundRabbitMqExtension extends Extension |
||||||
24 | { |
||||||
25 | /** |
||||||
26 | * @var ContainerBuilder |
||||||
27 | */ |
||||||
28 | private $container; |
||||||
29 | |||||||
30 | /** |
||||||
31 | * @var Boolean Whether the data collector is enabled |
||||||
32 | */ |
||||||
33 | private $collectorEnabled; |
||||||
34 | |||||||
35 | private $channelIds = []; |
||||||
36 | |||||||
37 | private $config = []; |
||||||
38 | |||||||
39 | 34 | public function load(array $configs, ContainerBuilder $container): void |
|||||
40 | { |
||||||
41 | 34 | $this->container = $container; |
|||||
42 | |||||||
43 | 34 | $loader = new XmlFileLoader($this->container, new FileLocator([__DIR__ . '/../Resources/config'])); |
|||||
44 | 34 | $loader->load('rabbitmq.xml'); |
|||||
45 | |||||||
46 | 34 | $configuration = $this->getConfiguration($configs, $container); |
|||||
47 | 34 | $this->config = $this->processConfiguration($configuration, $configs); |
|||||
48 | |||||||
49 | 34 | $this->collectorEnabled = $this->config['enable_collector']; |
|||||
50 | |||||||
51 | 34 | $this->loadConnections(); |
|||||
52 | 34 | $this->loadBindings(); |
|||||
53 | 34 | $this->loadProducers(); |
|||||
54 | 34 | $this->loadConsumers(); |
|||||
55 | 34 | $this->loadMultipleConsumers(); |
|||||
56 | 34 | $this->loadDynamicConsumers(); |
|||||
57 | 34 | $this->loadBatchConsumers(); |
|||||
58 | 34 | $this->loadAnonConsumers(); |
|||||
59 | 34 | $this->loadRpcClients(); |
|||||
60 | 34 | $this->loadRpcServers(); |
|||||
61 | |||||||
62 | 34 | if ($this->collectorEnabled && $this->channelIds) { |
|||||
0 ignored issues
–
show
|
|||||||
63 | 31 | $channels = []; |
|||||
64 | 31 | foreach (array_unique($this->channelIds) as $id) { |
|||||
65 | 31 | $channels[] = new Reference($id); |
|||||
66 | } |
||||||
67 | |||||||
68 | 31 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|||||
69 | 31 | $definition->replaceArgument(0, $channels); |
|||||
70 | } else { |
||||||
71 | 3 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
|||||
72 | } |
||||||
73 | 34 | } |
|||||
74 | |||||||
75 | 34 | public function getConfiguration(array $config, ContainerBuilder $container): ?ConfigurationInterface |
|||||
76 | { |
||||||
77 | 34 | return new Configuration($this->getAlias()); |
|||||
78 | } |
||||||
79 | |||||||
80 | 34 | protected function loadConnections() |
|||||
81 | { |
||||||
82 | 34 | foreach ($this->config['connections'] as $key => $connection) { |
|||||
83 | 34 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|||||
84 | 34 | $classParam = |
|||||
85 | 34 | $connection['lazy'] |
|||||
86 | 25 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
|||||
87 | 34 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
|||||
88 | |||||||
89 | 34 | $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', [ |
|||||
90 | 34 | $classParam, $connection, |
|||||
91 | ]); |
||||||
92 | 34 | if (isset($connection['connection_parameters_provider'])) { |
|||||
93 | $definition->addArgument(new Reference($connection['connection_parameters_provider'])); |
||||||
94 | unset($connection['connection_parameters_provider']); |
||||||
95 | } |
||||||
96 | 34 | $definition->setPublic(false); |
|||||
97 | 34 | $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key); |
|||||
98 | 34 | $this->container->setDefinition($factoryName, $definition); |
|||||
99 | |||||||
100 | 34 | $definition = new Definition($classParam); |
|||||
101 | 34 | if (method_exists($definition, 'setFactory')) { |
|||||
102 | // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||||
103 | 34 | $definition->setFactory([new Reference($factoryName), 'createConnection']); |
|||||
104 | } else { |
||||||
105 | // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||||
106 | $definition->setFactoryService($factoryName); |
||||||
0 ignored issues
–
show
The method
setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition . Did you maybe mean setFactory() ?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||||
107 | $definition->setFactoryMethod('createConnection'); |
||||||
0 ignored issues
–
show
The method
setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition . Did you maybe mean setFactory() ?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||||
108 | } |
||||||
109 | 34 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|||||
110 | 34 | $definition->setPublic(true); |
|||||
111 | |||||||
112 | 34 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition); |
|||||
113 | } |
||||||
114 | 34 | } |
|||||
115 | |||||||
116 | 34 | protected function loadBindings() |
|||||
117 | { |
||||||
118 | 34 | if ($this->config['sandbox']) { |
|||||
119 | return; |
||||||
120 | } |
||||||
121 | 34 | foreach ($this->config['bindings'] as $binding) { |
|||||
122 | 25 | ksort($binding); |
|||||
123 | 25 | $definition = new Definition($binding['class']); |
|||||
124 | 25 | $definition->addTag('old_sound_rabbit_mq.binding'); |
|||||
125 | 25 | $definition->addMethodCall('setArguments', [$binding['arguments']]); |
|||||
126 | 25 | $definition->addMethodCall('setDestination', [$binding['destination']]); |
|||||
127 | 25 | $definition->addMethodCall('setDestinationIsExchange', [$binding['destination_is_exchange']]); |
|||||
128 | 25 | $definition->addMethodCall('setExchange', [$binding['exchange']]); |
|||||
129 | 25 | $definition->addMethodCall('isNowait', [$binding['nowait']]); |
|||||
130 | 25 | $definition->addMethodCall('setRoutingKey', [$binding['routing_key']]); |
|||||
131 | 25 | $this->injectConnection($definition, $binding['connection']); |
|||||
132 | 25 | $key = md5(json_encode($binding)); |
|||||
133 | 25 | if ($this->collectorEnabled) { |
|||||
134 | // in the context of a binding, I don't thing logged channels are needed? |
||||||
135 | 25 | $this->injectLoggedChannel($definition, $key, $binding['connection']); |
|||||
136 | } |
||||||
137 | 25 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition); |
|||||
138 | } |
||||||
139 | 34 | } |
|||||
140 | |||||||
141 | 34 | protected function loadProducers() |
|||||
142 | { |
||||||
143 | 34 | if ($this->config['sandbox'] == false) { |
|||||
144 | 34 | foreach ($this->config['producers'] as $key => $producer) { |
|||||
145 | 30 | $definition = new Definition($producer['class']); |
|||||
146 | 30 | $definition->setPublic(true); |
|||||
147 | 30 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||||
148 | 30 | $definition->addTag('old_sound_rabbit_mq.producer'); |
|||||
149 | //this producer doesn't define an exchange -> using AMQP Default |
||||||
150 | 30 | if (!isset($producer['exchange_options'])) { |
|||||
151 | 1 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
|||||
152 | } |
||||||
153 | 30 | $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($producer['exchange_options'])]); |
|||||
154 | //this producer doesn't define a queue -> using AMQP Default |
||||||
155 | 30 | if (!isset($producer['queue_options'])) { |
|||||
156 | 30 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
|||||
157 | } |
||||||
158 | 30 | $definition->addMethodCall('setQueueOptions', [$producer['queue_options']]); |
|||||
159 | 30 | $this->injectConnection($definition, $producer['connection']); |
|||||
160 | 30 | if ($this->collectorEnabled) { |
|||||
161 | 28 | $this->injectLoggedChannel($definition, $key, $producer['connection']); |
|||||
162 | } |
||||||
163 | 30 | if (!$producer['auto_setup_fabric']) { |
|||||
164 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
165 | } |
||||||
166 | |||||||
167 | 30 | if ($producer['enable_logger']) { |
|||||
168 | $this->injectLogger($definition); |
||||||
169 | } |
||||||
170 | |||||||
171 | 30 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
|||||
172 | |||||||
173 | 30 | $this->container->setDefinition($producerServiceName, $definition); |
|||||
174 | 30 | if (null !== $producer['service_alias']) { |
|||||
175 | 25 | $this->container->setAlias($producer['service_alias'], $producerServiceName); |
|||||
176 | } |
||||||
177 | |||||||
178 | // register alias for argument auto wiring |
||||||
179 | 30 | if (method_exists($this->container, 'registerAliasForArgument')) { |
|||||
180 | 30 | $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key; |
|||||
181 | 30 | $this->container |
|||||
182 | 30 | ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName) |
|||||
183 | 30 | ->setPublic(false); |
|||||
184 | |||||||
185 | 30 | $this->container |
|||||
186 | 30 | ->registerAliasForArgument($producerServiceName, $producer['class'], $argName) |
|||||
187 | 30 | ->setPublic(false); |
|||||
188 | } |
||||||
189 | |||||||
190 | 30 | $definition->addMethodCall('setDefaultRoutingKey', [$producer['default_routing_key']]); |
|||||
191 | 30 | $definition->addMethodCall('setContentType', [$producer['default_content_type']]); |
|||||
192 | 30 | $definition->addMethodCall('setDeliveryMode', [$producer['default_delivery_mode']]); |
|||||
193 | } |
||||||
194 | } else { |
||||||
195 | foreach ($this->config['producers'] as $key => $producer) { |
||||||
196 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||||||
197 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
||||||
198 | $this->container->setDefinition($producerServiceName, $definition); |
||||||
199 | |||||||
200 | // register alias for argumen auto wiring |
||||||
201 | if (method_exists($this->container, 'registerAliasForArgument')) { |
||||||
202 | $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key; |
||||||
203 | $this->container |
||||||
204 | ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName) |
||||||
205 | ->setPublic(false); |
||||||
206 | } |
||||||
207 | } |
||||||
208 | } |
||||||
209 | 34 | } |
|||||
210 | |||||||
211 | 34 | protected function loadConsumers() |
|||||
212 | { |
||||||
213 | 34 | foreach ($this->config['consumers'] as $key => $consumer) { |
|||||
214 | 29 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); |
|||||
215 | 29 | $definition->setPublic(true); |
|||||
216 | 29 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||||
217 | 29 | $definition->addTag('old_sound_rabbit_mq.consumer'); |
|||||
218 | //this consumer doesn't define an exchange -> using AMQP Default |
||||||
219 | 29 | if (!isset($consumer['exchange_options'])) { |
|||||
220 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||||
221 | } |
||||||
222 | 29 | $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]); |
|||||
223 | //this consumer doesn't define a queue -> using AMQP Default |
||||||
224 | 29 | if (!isset($consumer['queue_options'])) { |
|||||
225 | $consumer['queue_options'] = $this->getDefaultQueueOptions(); |
||||||
226 | } |
||||||
227 | 29 | $definition->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])]); |
|||||
228 | 29 | $definition->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]); |
|||||
229 | |||||||
230 | 29 | if (array_key_exists('qos_options', $consumer)) { |
|||||
231 | 25 | $definition->addMethodCall('setQosOptions', [ |
|||||
232 | 25 | $consumer['qos_options']['prefetch_size'], |
|||||
233 | 25 | $consumer['qos_options']['prefetch_count'], |
|||||
234 | 25 | $consumer['qos_options']['global'], |
|||||
235 | ]); |
||||||
236 | } |
||||||
237 | |||||||
238 | 29 | if (isset($consumer['idle_timeout'])) { |
|||||
239 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
||||||
240 | } |
||||||
241 | 29 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
242 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||||
243 | } |
||||||
244 | 29 | if (isset($consumer['timeout_wait'])) { |
|||||
245 | 25 | $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]); |
|||||
246 | } |
||||||
247 | 29 | if (isset($consumer['graceful_max_execution'])) { |
|||||
248 | $definition->addMethodCall( |
||||||
249 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
250 | [$consumer['graceful_max_execution']['timeout']] |
||||||
251 | ); |
||||||
252 | $definition->addMethodCall( |
||||||
253 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
254 | [$consumer['graceful_max_execution']['exit_code']] |
||||||
255 | ); |
||||||
256 | } |
||||||
257 | 29 | if (!$consumer['auto_setup_fabric']) { |
|||||
258 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
259 | } |
||||||
260 | if (isset($consumer['options'])) { |
||||||
261 | 29 | $definition->addMethodCall( |
|||||
262 | 29 | 'setConsumerOptions', |
|||||
263 | 28 | [$this->normalizeArgumentKeys($consumer['options'])] |
|||||
264 | ); |
||||||
265 | } |
||||||
266 | 29 | ||||||
267 | 1 | $this->injectConnection($definition, $consumer['connection']); |
|||||
268 | if ($this->collectorEnabled) { |
||||||
269 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||||
270 | 29 | } |
|||||
271 | 29 | ||||||
272 | 29 | if ($consumer['enable_logger']) { |
|||||
273 | $this->injectLogger($definition); |
||||||
274 | } |
||||||
275 | 29 | ||||||
276 | 29 | $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); |
|||||
277 | 29 | $this->container->setDefinition($name, $definition); |
|||||
278 | 29 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||||
279 | 29 | ||||||
280 | // register alias for argument auto wiring |
||||||
281 | 29 | if (method_exists($this->container, 'registerAliasForArgument')) { |
|||||
282 | 29 | $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key; |
|||||
283 | 29 | $this->container |
|||||
284 | ->registerAliasForArgument($name, ConsumerInterface::class, $argName) |
||||||
285 | ->setPublic(false); |
||||||
286 | 34 | ||||||
287 | $this->container |
||||||
288 | 34 | ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName) |
|||||
289 | ->setPublic(false); |
||||||
290 | 34 | } |
|||||
291 | 25 | } |
|||||
292 | 25 | } |
|||||
293 | |||||||
294 | 25 | protected function loadMultipleConsumers() |
|||||
295 | { |
||||||
296 | foreach ($this->config['multiple_consumers'] as $key => $consumer) { |
||||||
297 | $queues = []; |
||||||
298 | $callbacks = []; |
||||||
299 | |||||||
300 | if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { |
||||||
301 | 25 | throw new InvalidConfigurationException( |
|||||
302 | 25 | "Error on loading $key multiple consumer. " . |
|||||
303 | 25 | "Either 'queues' or 'queues_provider' parameters should be defined." |
|||||
304 | 25 | ); |
|||||
305 | } |
||||||
306 | |||||||
307 | 25 | foreach ($consumer['queues'] as $queueName => $queueOptions) { |
|||||
308 | $queues[$queueOptions['name']] = $queueOptions; |
||||||
309 | 25 | $queues[$queueOptions['name']]['callback'] = [new Reference($queueOptions['callback']), 'execute']; |
|||||
310 | 25 | $callbacks[] = $queueOptions['callback']; |
|||||
311 | 25 | } |
|||||
312 | 25 | ||||||
313 | 25 | $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); |
|||||
314 | $definition |
||||||
315 | 25 | ->setPublic(true) |
|||||
316 | 25 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
317 | 25 | ->addTag('old_sound_rabbit_mq.multi_consumer') |
|||||
318 | 25 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]) |
|||||
319 | ->addMethodCall('setQueues', [$this->normalizeArgumentKeys($queues)]); |
||||||
320 | |||||||
321 | if ($consumer['queues_provider']) { |
||||||
322 | 25 | $definition->addMethodCall( |
|||||
323 | 'setQueuesProvider', |
||||||
324 | [new Reference($consumer['queues_provider'])] |
||||||
325 | ); |
||||||
326 | } |
||||||
327 | |||||||
328 | if (array_key_exists('qos_options', $consumer)) { |
||||||
329 | $definition->addMethodCall('setQosOptions', [ |
||||||
330 | 25 | $consumer['qos_options']['prefetch_size'], |
|||||
331 | $consumer['qos_options']['prefetch_count'], |
||||||
332 | $consumer['qos_options']['global'], |
||||||
333 | 25 | ]); |
|||||
334 | } |
||||||
335 | |||||||
336 | 25 | if (isset($consumer['idle_timeout'])) { |
|||||
337 | 25 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
|||||
338 | } |
||||||
339 | 25 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
340 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||||
341 | } |
||||||
342 | if (isset($consumer['timeout_wait'])) { |
||||||
343 | $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]); |
||||||
344 | } |
||||||
345 | if (isset($consumer['graceful_max_execution'])) { |
||||||
346 | $definition->addMethodCall( |
||||||
347 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
348 | [$consumer['graceful_max_execution']['timeout']] |
||||||
349 | 25 | ); |
|||||
350 | $definition->addMethodCall( |
||||||
351 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
352 | [$consumer['graceful_max_execution']['exit_code']] |
||||||
353 | 25 | ); |
|||||
354 | 25 | } |
|||||
355 | 25 | if (!$consumer['auto_setup_fabric']) { |
|||||
356 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
357 | } |
||||||
358 | 25 | if (isset($consumer['options'])) { |
|||||
359 | $definition->addMethodCall( |
||||||
360 | 'setConsumerOptions', |
||||||
361 | [$this->normalizeArgumentKeys($consumer['options'])] |
||||||
362 | 25 | ); |
|||||
363 | 25 | } |
|||||
364 | 25 | ||||||
365 | 25 | $this->injectConnection($definition, $consumer['connection']); |
|||||
366 | if ($this->collectorEnabled) { |
||||||
367 | 25 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||||
368 | 25 | } |
|||||
369 | |||||||
370 | if ($consumer['enable_logger']) { |
||||||
371 | 34 | $this->injectLogger($definition); |
|||||
372 | } |
||||||
373 | 34 | ||||||
374 | $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); |
||||||
375 | 34 | $this->container->setDefinition($name, $definition); |
|||||
376 | 25 | if ($consumer['queues_provider']) { |
|||||
377 | $this->addDequeuerAwareCall($consumer['queues_provider'], $name); |
||||||
378 | } |
||||||
379 | foreach ($callbacks as $callback) { |
||||||
380 | $this->addDequeuerAwareCall($callback, $name); |
||||||
381 | } |
||||||
382 | } |
||||||
383 | 25 | } |
|||||
384 | |||||||
385 | 25 | protected function loadDynamicConsumers() |
|||||
386 | 25 | { |
|||||
387 | 25 | foreach ($this->config['dynamic_consumers'] as $key => $consumer) { |
|||||
388 | 25 | if (empty($consumer['queue_options_provider'])) { |
|||||
389 | 25 | throw new InvalidConfigurationException( |
|||||
390 | 25 | "Error on loading $key dynamic consumer. " . |
|||||
391 | "'queue_provider' parameter should be defined." |
||||||
392 | 25 | ); |
|||||
393 | } |
||||||
394 | |||||||
395 | $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); |
||||||
396 | $definition |
||||||
397 | ->setPublic(true) |
||||||
398 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||||
399 | ->addTag('old_sound_rabbit_mq.consumer') |
||||||
400 | 25 | ->addTag('old_sound_rabbit_mq.dynamic_consumer') |
|||||
401 | 25 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]) |
|||||
402 | 25 | ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]); |
|||||
403 | |||||||
404 | if (array_key_exists('qos_options', $consumer)) { |
||||||
405 | 25 | $definition->addMethodCall('setQosOptions', [ |
|||||
406 | $consumer['qos_options']['prefetch_size'], |
||||||
407 | $consumer['qos_options']['prefetch_count'], |
||||||
408 | 25 | $consumer['qos_options']['global'], |
|||||
409 | ]); |
||||||
410 | } |
||||||
411 | 25 | ||||||
412 | $definition->addMethodCall( |
||||||
413 | 'setQueueOptionsProvider', |
||||||
414 | 25 | [new Reference($consumer['queue_options_provider'])] |
|||||
415 | ); |
||||||
416 | |||||||
417 | if (isset($consumer['idle_timeout'])) { |
||||||
418 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
||||||
419 | } |
||||||
420 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||||
421 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||||
422 | } |
||||||
423 | if (isset($consumer['timeout_wait'])) { |
||||||
424 | 25 | $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]); |
|||||
425 | } |
||||||
426 | if (isset($consumer['graceful_max_execution'])) { |
||||||
427 | $definition->addMethodCall( |
||||||
428 | 25 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
|||||
429 | 25 | [$consumer['graceful_max_execution']['timeout']] |
|||||
430 | 25 | ); |
|||||
431 | $definition->addMethodCall( |
||||||
432 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
433 | 25 | [$consumer['graceful_max_execution']['exit_code']] |
|||||
434 | ); |
||||||
435 | } |
||||||
436 | if (!$consumer['auto_setup_fabric']) { |
||||||
437 | 25 | $definition->addMethodCall('disableAutoSetupFabric'); |
|||||
438 | 25 | } |
|||||
439 | 25 | if (isset($consumer['options'])) { |
|||||
440 | 25 | $definition->addMethodCall( |
|||||
441 | 'setConsumerOptions', |
||||||
442 | 34 | [$this->normalizeArgumentKeys($consumer['options'])] |
|||||
443 | ); |
||||||
444 | 34 | } |
|||||
445 | |||||||
446 | 34 | $this->injectConnection($definition, $consumer['connection']); |
|||||
447 | if ($this->collectorEnabled) { |
||||||
448 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||||
449 | } |
||||||
450 | |||||||
451 | if ($consumer['enable_logger']) { |
||||||
452 | $this->injectLogger($definition); |
||||||
453 | } |
||||||
454 | |||||||
455 | $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key); |
||||||
456 | $this->container->setDefinition($name, $definition); |
||||||
457 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
||||||
458 | $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name); |
||||||
459 | } |
||||||
460 | } |
||||||
461 | |||||||
462 | protected function loadBatchConsumers() |
||||||
463 | { |
||||||
464 | foreach ($this->config['batch_consumers'] as $key => $consumer) { |
||||||
465 | $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); |
||||||
466 | |||||||
467 | if (!isset($consumer['exchange_options'])) { |
||||||
468 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||||
469 | } |
||||||
470 | |||||||
471 | $definition |
||||||
472 | ->setPublic(true) |
||||||
473 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||||
474 | ->addTag('old_sound_rabbit_mq.batch_consumer') |
||||||
475 | ->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]) |
||||||
476 | ->addMethodCall('setPrefetchCount', [$consumer['qos_options']['prefetch_count']]) |
||||||
477 | ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'batchExecute']]) |
||||||
478 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]) |
||||||
479 | ->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])]) |
||||||
480 | ->addMethodCall('setQosOptions', [ |
||||||
481 | $consumer['qos_options']['prefetch_size'], |
||||||
482 | $consumer['qos_options']['prefetch_count'], |
||||||
483 | $consumer['qos_options']['global'], |
||||||
484 | ]) |
||||||
485 | ; |
||||||
486 | |||||||
487 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||||
488 | $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]); |
||||||
489 | } |
||||||
490 | |||||||
491 | if (isset($consumer['idle_timeout'])) { |
||||||
492 | $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]); |
||||||
493 | } |
||||||
494 | |||||||
495 | if (isset($consumer['graceful_max_execution'])) { |
||||||
496 | $definition->addMethodCall( |
||||||
497 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
498 | [$consumer['graceful_max_execution']['timeout']] |
||||||
499 | ); |
||||||
500 | } |
||||||
501 | |||||||
502 | if (!$consumer['auto_setup_fabric']) { |
||||||
503 | 34 | $definition->addMethodCall('disableAutoSetupFabric'); |
|||||
504 | } |
||||||
505 | 34 | ||||||
506 | if (isset($consumer['options'])) { |
||||||
507 | 34 | $definition->addMethodCall( |
|||||
508 | 25 | 'setConsumerOptions', |
|||||
509 | [$this->normalizeArgumentKeys($consumer['options'])] |
||||||
510 | 25 | ); |
|||||
511 | 25 | } |
|||||
512 | 25 | ||||||
513 | 25 | if ($consumer['keep_alive']) { |
|||||
514 | 25 | $definition->addMethodCall('keepAlive'); |
|||||
515 | 25 | } |
|||||
516 | 25 | ||||||
517 | 25 | $this->injectConnection($definition, $consumer['connection']); |
|||||
518 | if ($this->collectorEnabled) { |
||||||
519 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||||
520 | 25 | } |
|||||
521 | 25 | ||||||
522 | 25 | if ($consumer['enable_logger']) { |
|||||
523 | $this->injectLogger($definition); |
||||||
524 | 34 | } |
|||||
525 | |||||||
526 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); |
||||||
527 | } |
||||||
528 | } |
||||||
529 | |||||||
530 | protected function loadAnonConsumers() |
||||||
531 | { |
||||||
532 | foreach ($this->config['anon_consumers'] as $key => $anon) { |
||||||
533 | $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); |
||||||
534 | 30 | $definition |
|||||
535 | ->setPublic(true) |
||||||
536 | 30 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
537 | 1 | ->addTag('old_sound_rabbit_mq.anon_consumer') |
|||||
538 | ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($anon['exchange_options'])]) |
||||||
539 | 1 | ->addMethodCall('setCallback', [[new Reference($anon['callback']), 'execute']]); |
|||||
540 | |||||||
541 | if (isset($anon['options'])) { |
||||||
542 | $definition->addMethodCall( |
||||||
543 | 1 | 'setConsumerOptions', |
|||||
544 | 1 | [$this->normalizeArgumentKeys($anon['options'])] |
|||||
545 | 1 | ); |
|||||
546 | } |
||||||
547 | |||||||
548 | 1 | $this->injectConnection($definition, $anon['connection']); |
|||||
549 | if ($this->collectorEnabled) { |
||||||
550 | 1 | $this->injectLoggedChannel($definition, $key, $anon['connection']); |
|||||
551 | } |
||||||
552 | 30 | ||||||
553 | $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); |
||||||
554 | $this->container->setDefinition($name, $definition); |
||||||
555 | $this->addDequeuerAwareCall($anon['callback'], $name); |
||||||
556 | } |
||||||
557 | } |
||||||
558 | |||||||
559 | /** |
||||||
560 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||||||
561 | * parameter. So we revert the change for right configurations. |
||||||
562 | * |
||||||
563 | * @param array $config |
||||||
564 | * |
||||||
565 | * @return array |
||||||
566 | */ |
||||||
567 | private function normalizeArgumentKeys(array $config): array |
||||||
568 | { |
||||||
569 | if (isset($config['arguments'])) { |
||||||
570 | $arguments = $config['arguments']; |
||||||
571 | // support for old configuration |
||||||
572 | if (is_string($arguments)) { |
||||||
573 | $arguments = $this->argumentsStringAsArray($arguments); |
||||||
0 ignored issues
–
show
The function
OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
574 | } |
||||||
575 | |||||||
576 | $newArguments = []; |
||||||
577 | foreach ($arguments as $key => $value) { |
||||||
578 | if (strstr($key, '_')) { |
||||||
579 | 34 | $key = str_replace('_', '-', $key); |
|||||
580 | } |
||||||
581 | 34 | $newArguments[$key] = $value; |
|||||
582 | 28 | } |
|||||
583 | 28 | $config['arguments'] = $newArguments; |
|||||
584 | } |
||||||
585 | 28 | return $config; |
|||||
586 | 28 | } |
|||||
587 | 28 | ||||||
588 | 28 | /** |
|||||
589 | 28 | * Support for arguments provided as string. Support for old configuration files. |
|||||
590 | * |
||||||
591 | 28 | * @deprecated |
|||||
592 | 28 | * @param string $arguments |
|||||
593 | * @return array |
||||||
594 | 28 | */ |
|||||
595 | 28 | private function argumentsStringAsArray($arguments): array |
|||||
596 | { |
||||||
597 | 28 | $argumentsArray = []; |
|||||
598 | |||||||
599 | 28 | $argumentPairs = explode(',', $arguments); |
|||||
600 | foreach ($argumentPairs as $argument) { |
||||||
601 | 34 | $argumentPair = explode(':', $argument); |
|||||
602 | $type = 'S'; |
||||||
603 | 34 | if (isset($argumentPair[2])) { |
|||||
604 | $type = $argumentPair[2]; |
||||||
605 | 34 | } |
|||||
606 | 25 | $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]]; |
|||||
607 | } |
||||||
608 | 25 | ||||||
609 | 25 | return $argumentsArray; |
|||||
610 | 25 | } |
|||||
611 | 25 | ||||||
612 | 25 | protected function loadRpcClients() |
|||||
613 | 25 | { |
|||||
614 | 25 | foreach ($this->config['rpc_clients'] as $key => $client) { |
|||||
615 | 25 | $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); |
|||||
616 | $definition->setLazy($client['lazy']); |
||||||
617 | 25 | $definition |
|||||
618 | ->addTag('old_sound_rabbit_mq.rpc_client') |
||||||
619 | ->addMethodCall('initClient', [$client['expect_serialized_response']]); |
||||||
620 | $this->injectConnection($definition, $client['connection']); |
||||||
621 | if ($this->collectorEnabled) { |
||||||
622 | $this->injectLoggedChannel($definition, $key, $client['connection']); |
||||||
623 | } |
||||||
624 | 25 | if (array_key_exists('unserializer', $client)) { |
|||||
625 | 25 | $definition->addMethodCall('setUnserializer', [$client['unserializer']]); |
|||||
626 | } |
||||||
627 | 25 | if (array_key_exists('direct_reply_to', $client)) { |
|||||
628 | 25 | $definition->addMethodCall('setDirectReplyTo', [$client['direct_reply_to']]); |
|||||
629 | } |
||||||
630 | 25 | $definition->setPublic(true); |
|||||
631 | 25 | ||||||
632 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition); |
||||||
633 | 25 | } |
|||||
634 | } |
||||||
635 | 34 | ||||||
636 | protected function loadRpcServers() |
||||||
637 | 31 | { |
|||||
638 | foreach ($this->config['rpc_servers'] as $key => $server) { |
||||||
639 | 31 | $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); |
|||||
640 | 31 | $definition |
|||||
641 | ->setPublic(true) |
||||||
642 | 31 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
643 | 31 | ->addTag('old_sound_rabbit_mq.rpc_server') |
|||||
644 | 31 | ->addMethodCall('initServer', [$key]) |
|||||
645 | ->addMethodCall('setCallback', [[new Reference($server['callback']), 'execute']]); |
||||||
646 | 31 | $this->injectConnection($definition, $server['connection']); |
|||||
647 | if ($this->collectorEnabled) { |
||||||
648 | 31 | $this->injectLoggedChannel($definition, $key, $server['connection']); |
|||||
649 | 31 | } |
|||||
650 | 31 | if (array_key_exists('qos_options', $server)) { |
|||||
651 | $definition->addMethodCall('setQosOptions', [ |
||||||
652 | 33 | $server['qos_options']['prefetch_size'], |
|||||
653 | $server['qos_options']['prefetch_count'], |
||||||
654 | 33 | $server['qos_options']['global'], |
|||||
655 | 33 | ]); |
|||||
656 | } |
||||||
657 | 34 | if (array_key_exists('exchange_options', $server)) { |
|||||
658 | $definition->addMethodCall('setExchangeOptions', [$server['exchange_options']]); |
||||||
659 | 34 | } |
|||||
660 | if (array_key_exists('queue_options', $server)) { |
||||||
661 | $definition->addMethodCall('setQueueOptions', [$server['queue_options']]); |
||||||
662 | } |
||||||
663 | if (array_key_exists('serializer', $server)) { |
||||||
664 | $definition->addMethodCall('setSerializer', [$server['serializer']]); |
||||||
665 | } |
||||||
666 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition); |
||||||
667 | } |
||||||
668 | } |
||||||
669 | 29 | ||||||
670 | protected function injectLoggedChannel(Definition $definition, $name, $connectionName) |
||||||
671 | 29 | { |
|||||
672 | 29 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
|||||
673 | $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); |
||||||
674 | $channel |
||||||
675 | ->setPublic(false) |
||||||
676 | ->addTag('old_sound_rabbit_mq.logged_channel'); |
||||||
677 | $this->injectConnection($channel, $connectionName); |
||||||
678 | |||||||
679 | $this->container->setDefinition($id, $channel); |
||||||
680 | |||||||
681 | $this->channelIds[] = $id; |
||||||
682 | 1 | $definition->addArgument(new Reference($id)); |
|||||
683 | } |
||||||
684 | 1 | ||||||
685 | 1 | protected function injectConnection(Definition $definition, $connectionName) |
|||||
686 | { |
||||||
687 | 1 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
|||||
688 | 1 | } |
|||||
689 | |||||||
690 | public function getAlias(): string |
||||||
691 | { |
||||||
692 | return 'old_sound_rabbit_mq'; |
||||||
693 | } |
||||||
694 | |||||||
695 | 1 | /** |
|||||
696 | * Add proper dequeuer aware call |
||||||
697 | * |
||||||
698 | 1 | * @param string $callback |
|||||
699 | * @param string $name |
||||||
700 | * @throws \ReflectionException |
||||||
701 | */ |
||||||
702 | protected function addDequeuerAwareCall($callback, $name) |
||||||
703 | { |
||||||
704 | if (!$this->container->has($callback)) { |
||||||
705 | return; |
||||||
706 | } |
||||||
707 | |||||||
708 | $callbackDefinition = $this->container->findDefinition($callback); |
||||||
709 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||||||
710 | 30 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
|||||
711 | $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]); |
||||||
712 | } |
||||||
713 | 30 | } |
|||||
714 | |||||||
715 | private function injectLogger(Definition $definition) |
||||||
716 | { |
||||||
717 | $definition->addTag('monolog.logger', [ |
||||||
718 | 'channel' => 'phpamqplib', |
||||||
719 | ]); |
||||||
720 | $definition->addMethodCall('setLogger', [new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]); |
||||||
721 | } |
||||||
722 | |||||||
723 | /** |
||||||
724 | * Get default AMQP exchange options |
||||||
725 | * |
||||||
726 | * @return array |
||||||
727 | */ |
||||||
728 | protected function getDefaultExchangeOptions(): array |
||||||
729 | { |
||||||
730 | return [ |
||||||
731 | 'name' => '', |
||||||
732 | 'type' => 'direct', |
||||||
733 | 'passive' => true, |
||||||
734 | 'declare' => false, |
||||||
735 | ]; |
||||||
736 | } |
||||||
737 | |||||||
738 | /** |
||||||
739 | * Get default AMQP queue options |
||||||
740 | * |
||||||
741 | * @return array |
||||||
742 | */ |
||||||
743 | protected function getDefaultQueueOptions(): array |
||||||
744 | { |
||||||
745 | return [ |
||||||
746 | 'name' => '', |
||||||
747 | 'declare' => false, |
||||||
748 | ]; |
||||||
749 | } |
||||||
750 | } |
||||||
751 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.