1 | <?php |
||||
2 | |||||
3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||||
4 | |||||
5 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||||
6 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||||
7 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||||
8 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||||
9 | use Symfony\Component\DependencyInjection\Definition; |
||||
10 | use Symfony\Component\DependencyInjection\Reference; |
||||
11 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||||
12 | use Symfony\Component\Config\FileLocator; |
||||
13 | |||||
14 | /** |
||||
15 | * OldSoundRabbitMqExtension. |
||||
16 | * |
||||
17 | * @author Alvaro Videla |
||||
18 | * @author Marc Weistroff <[email protected]> |
||||
19 | */ |
||||
20 | class OldSoundRabbitMqExtension extends Extension |
||||
21 | { |
||||
22 | /** |
||||
23 | * @var ContainerBuilder |
||||
24 | */ |
||||
25 | private $container; |
||||
26 | |||||
27 | /** |
||||
28 | * @var Boolean Whether the data collector is enabled |
||||
29 | */ |
||||
30 | private $collectorEnabled; |
||||
31 | |||||
32 | private $channelIds = array(); |
||||
33 | |||||
34 | private $config = array(); |
||||
35 | |||||
36 | 31 | public function load(array $configs, ContainerBuilder $container) |
|||
37 | { |
||||
38 | 31 | $this->container = $container; |
|||
39 | |||||
40 | 31 | $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); |
|||
41 | 31 | $loader->load('rabbitmq.xml'); |
|||
42 | |||||
43 | 31 | $configuration = $this->getConfiguration($configs, $container); |
|||
44 | 31 | $this->config = $this->processConfiguration($configuration, $configs); |
|||
45 | |||||
46 | 31 | $this->collectorEnabled = $this->config['enable_collector']; |
|||
47 | |||||
48 | 31 | $this->loadConnections(); |
|||
49 | 31 | $this->loadBindings(); |
|||
50 | 31 | $this->loadProducers(); |
|||
51 | 31 | $this->loadConsumers(); |
|||
52 | 31 | $this->loadMultipleConsumers(); |
|||
53 | 31 | $this->loadDynamicConsumers(); |
|||
54 | 31 | $this->loadBatchConsumers(); |
|||
55 | 31 | $this->loadAnonConsumers(); |
|||
56 | 31 | $this->loadRpcClients(); |
|||
57 | 31 | $this->loadRpcServers(); |
|||
58 | |||||
59 | 31 | if ($this->collectorEnabled && $this->channelIds) { |
|||
0 ignored issues
–
show
|
|||||
60 | 28 | $channels = array(); |
|||
61 | 28 | foreach (array_unique($this->channelIds) as $id) { |
|||
62 | 28 | $channels[] = new Reference($id); |
|||
63 | } |
||||
64 | |||||
65 | 28 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|||
66 | 28 | $definition->replaceArgument(0, $channels); |
|||
67 | } else { |
||||
68 | 3 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
|||
69 | } |
||||
70 | 31 | } |
|||
71 | |||||
72 | 31 | public function getConfiguration(array $config, ContainerBuilder $container) |
|||
73 | { |
||||
74 | 31 | return new Configuration($this->getAlias()); |
|||
75 | } |
||||
76 | |||||
77 | 31 | protected function loadConnections() |
|||
78 | { |
||||
79 | 31 | foreach ($this->config['connections'] as $key => $connection) { |
|||
80 | 31 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|||
81 | $classParam = |
||||
82 | 31 | $connection['lazy'] |
|||
83 | 22 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
|||
84 | 31 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
|||
85 | |||||
86 | 31 | $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array( |
|||
87 | 31 | $classParam, $connection, |
|||
88 | )); |
||||
89 | 31 | if (isset($connection['connection_parameters_provider'])) { |
|||
90 | $definition->addArgument(new Reference($connection['connection_parameters_provider'])); |
||||
91 | unset($connection['connection_parameters_provider']); |
||||
92 | } |
||||
93 | 31 | $definition->setPublic(false); |
|||
94 | 31 | $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key); |
|||
95 | 31 | $this->container->setDefinition($factoryName, $definition); |
|||
96 | |||||
97 | 31 | $definition = new Definition($classParam); |
|||
98 | 31 | if (method_exists($definition, 'setFactory')) { |
|||
99 | // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||
100 | 31 | $definition->setFactory(array(new Reference($factoryName), 'createConnection')); |
|||
101 | } else { |
||||
102 | // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||
103 | $definition->setFactoryService($factoryName); |
||||
104 | $definition->setFactoryMethod('createConnection'); |
||||
105 | } |
||||
106 | 31 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|||
107 | 31 | $definition->setPublic(true); |
|||
108 | |||||
109 | 31 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition); |
|||
110 | } |
||||
111 | 31 | } |
|||
112 | |||||
113 | 31 | protected function loadBindings() |
|||
114 | { |
||||
115 | 31 | if ($this->config['sandbox']) { |
|||
116 | return; |
||||
117 | } |
||||
118 | 31 | foreach ($this->config['bindings'] as $binding) { |
|||
119 | 22 | ksort($binding); |
|||
120 | 22 | $definition = new Definition($binding['class']); |
|||
121 | 22 | $definition->addTag('old_sound_rabbit_mq.binding'); |
|||
122 | 22 | $definition->addMethodCall('setArguments', array($binding['arguments'])); |
|||
123 | 22 | $definition->addMethodCall('setDestination', array($binding['destination'])); |
|||
124 | 22 | $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange'])); |
|||
125 | 22 | $definition->addMethodCall('setExchange', array($binding['exchange'])); |
|||
126 | 22 | $definition->addMethodCall('isNowait', array($binding['nowait'])); |
|||
127 | 22 | $definition->addMethodCall('setRoutingKey', array($binding['routing_key'])); |
|||
128 | 22 | $this->injectConnection($definition, $binding['connection']); |
|||
129 | 22 | $key = md5(json_encode($binding)); |
|||
130 | 22 | if ($this->collectorEnabled) { |
|||
131 | // in the context of a binding, I don't thing logged channels are needed? |
||||
132 | 22 | $this->injectLoggedChannel($definition, $key, $binding['connection']); |
|||
133 | } |
||||
134 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition); |
|||
135 | } |
||||
136 | 31 | } |
|||
137 | |||||
138 | 31 | protected function loadProducers() |
|||
139 | { |
||||
140 | 31 | if ($this->config['sandbox'] == false) { |
|||
141 | 31 | foreach ($this->config['producers'] as $key => $producer) { |
|||
142 | 27 | $definition = new Definition($producer['class']); |
|||
143 | 27 | $definition->setPublic(true); |
|||
144 | 27 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||
145 | 27 | $definition->addTag('old_sound_rabbit_mq.producer'); |
|||
146 | //this producer doesn't define an exchange -> using AMQP Default |
||||
147 | 27 | if (!isset($producer['exchange_options'])) { |
|||
148 | 1 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
|||
149 | } |
||||
150 | 27 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options']))); |
|||
151 | //this producer doesn't define a queue -> using AMQP Default |
||||
152 | 27 | if (!isset($producer['queue_options'])) { |
|||
153 | 27 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
|||
154 | } |
||||
155 | 27 | $definition->addMethodCall('setQueueOptions', array($producer['queue_options'])); |
|||
156 | 27 | $this->injectConnection($definition, $producer['connection']); |
|||
157 | 27 | if ($this->collectorEnabled) { |
|||
158 | 25 | $this->injectLoggedChannel($definition, $key, $producer['connection']); |
|||
159 | } |
||||
160 | 27 | if (!$producer['auto_setup_fabric']) { |
|||
161 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
162 | } |
||||
163 | |||||
164 | 27 | if ($producer['enable_logger']) { |
|||
165 | $this->injectLogger($definition); |
||||
166 | } |
||||
167 | |||||
168 | 27 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
|||
169 | |||||
170 | 27 | $this->container->setDefinition($producerServiceName, $definition); |
|||
171 | 27 | if (null !== $producer['service_alias']) { |
|||
172 | 22 | $this->container->setAlias($producer['service_alias'], $producerServiceName); |
|||
173 | } |
||||
174 | } |
||||
175 | } else { |
||||
176 | foreach ($this->config['producers'] as $key => $producer) { |
||||
177 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||||
178 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition); |
||||
179 | } |
||||
180 | } |
||||
181 | 31 | } |
|||
182 | |||||
183 | 31 | protected function loadConsumers() |
|||
184 | { |
||||
185 | 31 | foreach ($this->config['consumers'] as $key => $consumer) { |
|||
186 | 26 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); |
|||
187 | 26 | $definition->setPublic(true); |
|||
188 | 26 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||
189 | 26 | $definition->addTag('old_sound_rabbit_mq.consumer'); |
|||
190 | //this consumer doesn't define an exchange -> using AMQP Default |
||||
191 | 26 | if (!isset($consumer['exchange_options'])) { |
|||
192 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||
193 | } |
||||
194 | 26 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))); |
|||
195 | //this consumer doesn't define a queue -> using AMQP Default |
||||
196 | 26 | if (!isset($consumer['queue_options'])) { |
|||
197 | $consumer['queue_options'] = $this->getDefaultQueueOptions(); |
||||
198 | } |
||||
199 | 26 | $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))); |
|||
200 | 26 | $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||
201 | |||||
202 | 26 | if (array_key_exists('qos_options', $consumer)) { |
|||
203 | 22 | $definition->addMethodCall('setQosOptions', array( |
|||
204 | 22 | $consumer['qos_options']['prefetch_size'], |
|||
205 | 22 | $consumer['qos_options']['prefetch_count'], |
|||
206 | 22 | $consumer['qos_options']['global'] |
|||
207 | )); |
||||
208 | } |
||||
209 | |||||
210 | 26 | if (isset($consumer['idle_timeout'])) { |
|||
211 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
212 | } |
||||
213 | 26 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
214 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
215 | } |
||||
216 | 26 | if (isset($consumer['timeout_wait'])) { |
|||
217 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||
218 | } |
||||
219 | 26 | if (isset($consumer['graceful_max_execution'])) { |
|||
220 | $definition->addMethodCall( |
||||
221 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
222 | array($consumer['graceful_max_execution']['timeout']) |
||||
223 | ); |
||||
224 | $definition->addMethodCall( |
||||
225 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
226 | array($consumer['graceful_max_execution']['exit_code']) |
||||
227 | ); |
||||
228 | } |
||||
229 | 26 | if (!$consumer['auto_setup_fabric']) { |
|||
230 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
231 | } |
||||
232 | |||||
233 | 26 | $this->injectConnection($definition, $consumer['connection']); |
|||
234 | 26 | if ($this->collectorEnabled) { |
|||
235 | 25 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||
236 | } |
||||
237 | |||||
238 | 26 | if ($consumer['enable_logger']) { |
|||
239 | 1 | $this->injectLogger($definition); |
|||
240 | } |
||||
241 | |||||
242 | 26 | $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); |
|||
243 | 26 | $this->container->setDefinition($name, $definition); |
|||
244 | 26 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||
245 | } |
||||
246 | 31 | } |
|||
247 | |||||
248 | 31 | protected function loadMultipleConsumers() |
|||
249 | { |
||||
250 | 31 | foreach ($this->config['multiple_consumers'] as $key => $consumer) { |
|||
251 | 22 | $queues = array(); |
|||
252 | 22 | $callbacks = array(); |
|||
253 | |||||
254 | 22 | if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { |
|||
255 | throw new InvalidConfigurationException( |
||||
256 | "Error on loading $key multiple consumer. " . |
||||
257 | "Either 'queues' or 'queues_provider' parameters should be defined." |
||||
258 | ); |
||||
259 | } |
||||
260 | |||||
261 | 22 | foreach ($consumer['queues'] as $queueName => $queueOptions) { |
|||
262 | 22 | $queues[$queueOptions['name']] = $queueOptions; |
|||
263 | 22 | $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute'); |
|||
264 | 22 | $callbacks[] = $queueOptions['callback']; |
|||
265 | } |
||||
266 | |||||
267 | 22 | $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); |
|||
268 | $definition |
||||
269 | 22 | ->setPublic(true) |
|||
270 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
271 | 22 | ->addTag('old_sound_rabbit_mq.multi_consumer') |
|||
272 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||
273 | 22 | ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues))); |
|||
274 | |||||
275 | 22 | if ($consumer['queues_provider']) { |
|||
276 | 22 | $definition->addMethodCall( |
|||
277 | 22 | 'setQueuesProvider', |
|||
278 | 22 | array(new Reference($consumer['queues_provider'])) |
|||
279 | ); |
||||
280 | } |
||||
281 | |||||
282 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||
283 | $definition->addMethodCall('setQosOptions', array( |
||||
284 | $consumer['qos_options']['prefetch_size'], |
||||
285 | $consumer['qos_options']['prefetch_count'], |
||||
286 | $consumer['qos_options']['global'] |
||||
287 | )); |
||||
288 | } |
||||
289 | |||||
290 | 22 | if (isset($consumer['idle_timeout'])) { |
|||
291 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
292 | } |
||||
293 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
294 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
295 | } |
||||
296 | 22 | if (isset($consumer['timeout_wait'])) { |
|||
297 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||
298 | } |
||||
299 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||
300 | $definition->addMethodCall( |
||||
301 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
302 | array($consumer['graceful_max_execution']['timeout']) |
||||
303 | ); |
||||
304 | $definition->addMethodCall( |
||||
305 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
306 | array($consumer['graceful_max_execution']['exit_code']) |
||||
307 | ); |
||||
308 | } |
||||
309 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||
310 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
311 | } |
||||
312 | |||||
313 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||
314 | 22 | if ($this->collectorEnabled) { |
|||
315 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||
316 | } |
||||
317 | |||||
318 | 22 | if ($consumer['enable_logger']) { |
|||
319 | $this->injectLogger($definition); |
||||
320 | } |
||||
321 | |||||
322 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); |
|||
323 | 22 | $this->container->setDefinition($name, $definition); |
|||
324 | 22 | if ($consumer['queues_provider']) { |
|||
325 | 22 | $this->addDequeuerAwareCall($consumer['queues_provider'], $name); |
|||
326 | } |
||||
327 | 22 | foreach ($callbacks as $callback) { |
|||
328 | 22 | $this->addDequeuerAwareCall($callback, $name); |
|||
329 | } |
||||
330 | } |
||||
331 | 31 | } |
|||
332 | |||||
333 | 31 | protected function loadDynamicConsumers() |
|||
334 | { |
||||
335 | 31 | foreach ($this->config['dynamic_consumers'] as $key => $consumer) { |
|||
336 | |||||
337 | 22 | if (empty($consumer['queue_options_provider'])) { |
|||
338 | throw new InvalidConfigurationException( |
||||
339 | "Error on loading $key dynamic consumer. " . |
||||
340 | "'queue_provider' parameter should be defined." |
||||
341 | ); |
||||
342 | } |
||||
343 | |||||
344 | 22 | $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); |
|||
345 | $definition |
||||
346 | 22 | ->setPublic(true) |
|||
347 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
348 | 22 | ->addTag('old_sound_rabbit_mq.consumer') |
|||
349 | 22 | ->addTag('old_sound_rabbit_mq.dynamic_consumer') |
|||
350 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||
351 | 22 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||
352 | |||||
353 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||
354 | $definition->addMethodCall('setQosOptions', array( |
||||
355 | $consumer['qos_options']['prefetch_size'], |
||||
356 | $consumer['qos_options']['prefetch_count'], |
||||
357 | $consumer['qos_options']['global'] |
||||
358 | )); |
||||
359 | } |
||||
360 | |||||
361 | 22 | $definition->addMethodCall( |
|||
362 | 22 | 'setQueueOptionsProvider', |
|||
363 | 22 | array(new Reference($consumer['queue_options_provider'])) |
|||
364 | ); |
||||
365 | |||||
366 | 22 | if (isset($consumer['idle_timeout'])) { |
|||
367 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
368 | } |
||||
369 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
370 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
371 | } |
||||
372 | 22 | if (isset($consumer['timeout_wait'])) { |
|||
373 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
||||
374 | } |
||||
375 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||
376 | $definition->addMethodCall( |
||||
377 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
378 | array($consumer['graceful_max_execution']['timeout']) |
||||
379 | ); |
||||
380 | $definition->addMethodCall( |
||||
381 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
382 | array($consumer['graceful_max_execution']['exit_code']) |
||||
383 | ); |
||||
384 | } |
||||
385 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||
386 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
387 | } |
||||
388 | |||||
389 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||
390 | 22 | if ($this->collectorEnabled) { |
|||
391 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||
392 | } |
||||
393 | |||||
394 | 22 | if ($consumer['enable_logger']) { |
|||
395 | $this->injectLogger($definition); |
||||
396 | } |
||||
397 | |||||
398 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key); |
|||
399 | 22 | $this->container->setDefinition($name, $definition); |
|||
400 | 22 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||
401 | 22 | $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name); |
|||
402 | } |
||||
403 | 31 | } |
|||
404 | |||||
405 | 31 | protected function loadBatchConsumers() |
|||
406 | { |
||||
407 | 31 | foreach ($this->config['batch_consumers'] as $key => $consumer) { |
|||
408 | $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); |
||||
409 | |||||
410 | if (!isset($consumer['exchange_options'])) { |
||||
411 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||
412 | } |
||||
413 | |||||
414 | $definition |
||||
415 | ->setPublic(true) |
||||
416 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||
417 | ->addTag('old_sound_rabbit_mq.batch_consumer') |
||||
418 | ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) |
||||
419 | ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) |
||||
420 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) |
||||
421 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
||||
422 | ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) |
||||
423 | ->addMethodCall('setQosOptions', array( |
||||
424 | $consumer['qos_options']['prefetch_size'], |
||||
425 | $consumer['qos_options']['prefetch_count'], |
||||
426 | $consumer['qos_options']['global'] |
||||
427 | )) |
||||
428 | ; |
||||
429 | |||||
430 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||
431 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
432 | } |
||||
433 | |||||
434 | if (isset($consumer['idle_timeout'])) { |
||||
435 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
436 | } |
||||
437 | |||||
438 | if (isset($consumer['graceful_max_execution'])) { |
||||
439 | $definition->addMethodCall( |
||||
440 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
441 | array($consumer['graceful_max_execution']['timeout']) |
||||
442 | ); |
||||
443 | } |
||||
444 | |||||
445 | if (!$consumer['auto_setup_fabric']) { |
||||
446 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
447 | } |
||||
448 | |||||
449 | if ($consumer['keep_alive']) { |
||||
450 | $definition->addMethodCall('keepAlive'); |
||||
451 | } |
||||
452 | |||||
453 | $this->injectConnection($definition, $consumer['connection']); |
||||
454 | if ($this->collectorEnabled) { |
||||
455 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||
456 | } |
||||
457 | |||||
458 | if ($consumer['enable_logger']) { |
||||
459 | $this->injectLogger($definition); |
||||
460 | } |
||||
461 | |||||
462 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); |
||||
463 | } |
||||
464 | 31 | } |
|||
465 | |||||
466 | 31 | protected function loadAnonConsumers() |
|||
467 | { |
||||
468 | 31 | foreach ($this->config['anon_consumers'] as $key => $anon) { |
|||
469 | 22 | $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); |
|||
470 | $definition |
||||
471 | 22 | ->setPublic(true) |
|||
472 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
473 | 22 | ->addTag('old_sound_rabbit_mq.anon_consumer') |
|||
474 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options']))) |
|||
475 | 22 | ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute'))); |
|||
476 | 22 | $this->injectConnection($definition, $anon['connection']); |
|||
477 | 22 | if ($this->collectorEnabled) { |
|||
478 | 22 | $this->injectLoggedChannel($definition, $key, $anon['connection']); |
|||
479 | } |
||||
480 | |||||
481 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); |
|||
482 | 22 | $this->container->setDefinition($name, $definition); |
|||
483 | 22 | $this->addDequeuerAwareCall($anon['callback'], $name); |
|||
484 | } |
||||
485 | 31 | } |
|||
486 | |||||
487 | /** |
||||
488 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||||
489 | * parameter. So we revert the change for right configurations. |
||||
490 | * |
||||
491 | * @param array $config |
||||
492 | * |
||||
493 | * @return array |
||||
494 | */ |
||||
495 | 27 | private function normalizeArgumentKeys(array $config) |
|||
496 | { |
||||
497 | 27 | if (isset($config['arguments'])) { |
|||
498 | 1 | $arguments = $config['arguments']; |
|||
499 | // support for old configuration |
||||
500 | 1 | if (is_string($arguments)) { |
|||
501 | $arguments = $this->argumentsStringAsArray($arguments); |
||||
0 ignored issues
–
show
The function
OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
502 | } |
||||
503 | |||||
504 | 1 | $newArguments = array(); |
|||
505 | 1 | foreach ($arguments as $key => $value) { |
|||
506 | 1 | if (strstr($key, '_')) { |
|||
507 | $key = str_replace('_', '-', $key); |
||||
508 | } |
||||
509 | 1 | $newArguments[$key] = $value; |
|||
510 | } |
||||
511 | 1 | $config['arguments'] = $newArguments; |
|||
512 | } |
||||
513 | 27 | return $config; |
|||
514 | } |
||||
515 | |||||
516 | /** |
||||
517 | * Support for arguments provided as string. Support for old configuration files. |
||||
518 | * |
||||
519 | * @deprecated |
||||
520 | * @param string $arguments |
||||
521 | * @return array |
||||
522 | */ |
||||
523 | private function argumentsStringAsArray($arguments) |
||||
524 | { |
||||
525 | $argumentsArray = array(); |
||||
526 | |||||
527 | $argumentPairs = explode(',', $arguments); |
||||
528 | foreach ($argumentPairs as $argument) { |
||||
529 | $argumentPair = explode(':', $argument); |
||||
530 | $type = 'S'; |
||||
531 | if (isset($argumentPair[2])) { |
||||
532 | $type = $argumentPair[2]; |
||||
533 | } |
||||
534 | $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]); |
||||
535 | } |
||||
536 | |||||
537 | return $argumentsArray; |
||||
538 | } |
||||
539 | |||||
540 | 31 | protected function loadRpcClients() |
|||
541 | { |
||||
542 | 31 | foreach ($this->config['rpc_clients'] as $key => $client) { |
|||
543 | 25 | $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); |
|||
544 | 25 | $definition->setLazy($client['lazy']); |
|||
545 | $definition |
||||
546 | 25 | ->addTag('old_sound_rabbit_mq.rpc_client') |
|||
547 | 25 | ->addMethodCall('initClient', array($client['expect_serialized_response'])); |
|||
548 | 25 | $this->injectConnection($definition, $client['connection']); |
|||
549 | 25 | if ($this->collectorEnabled) { |
|||
550 | 25 | $this->injectLoggedChannel($definition, $key, $client['connection']); |
|||
551 | } |
||||
552 | 25 | if (array_key_exists('unserializer', $client)) { |
|||
553 | 25 | $definition->addMethodCall('setUnserializer', array($client['unserializer'])); |
|||
554 | } |
||||
555 | 25 | if (array_key_exists('direct_reply_to', $client)) { |
|||
556 | 25 | $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to'])); |
|||
557 | } |
||||
558 | 25 | $definition->setPublic(true); |
|||
559 | |||||
560 | 25 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition); |
|||
561 | } |
||||
562 | 31 | } |
|||
563 | |||||
564 | 31 | protected function loadRpcServers() |
|||
565 | { |
||||
566 | 31 | foreach ($this->config['rpc_servers'] as $key => $server) { |
|||
567 | 22 | $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); |
|||
568 | $definition |
||||
569 | 22 | ->setPublic(true) |
|||
570 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
571 | 22 | ->addTag('old_sound_rabbit_mq.rpc_server') |
|||
572 | 22 | ->addMethodCall('initServer', array($key)) |
|||
573 | 22 | ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute'))); |
|||
574 | 22 | $this->injectConnection($definition, $server['connection']); |
|||
575 | 22 | if ($this->collectorEnabled) { |
|||
576 | 22 | $this->injectLoggedChannel($definition, $key, $server['connection']); |
|||
577 | } |
||||
578 | 22 | if (array_key_exists('qos_options', $server)) { |
|||
579 | $definition->addMethodCall('setQosOptions', array( |
||||
580 | $server['qos_options']['prefetch_size'], |
||||
581 | $server['qos_options']['prefetch_count'], |
||||
582 | $server['qos_options']['global'] |
||||
583 | )); |
||||
584 | } |
||||
585 | 22 | if (array_key_exists('exchange_options', $server)) { |
|||
586 | 22 | $definition->addMethodCall('setExchangeOptions', array($server['exchange_options'])); |
|||
587 | } |
||||
588 | 22 | if (array_key_exists('queue_options', $server)) { |
|||
589 | 22 | $definition->addMethodCall('setQueueOptions', array($server['queue_options'])); |
|||
590 | } |
||||
591 | 22 | if (array_key_exists('serializer', $server)) { |
|||
592 | 22 | $definition->addMethodCall('setSerializer', array($server['serializer'])); |
|||
593 | } |
||||
594 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition); |
|||
595 | } |
||||
596 | 31 | } |
|||
597 | |||||
598 | 28 | protected function injectLoggedChannel(Definition $definition, $name, $connectionName) |
|||
599 | { |
||||
600 | 28 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
|||
601 | 28 | $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); |
|||
602 | $channel |
||||
603 | 28 | ->setPublic(false) |
|||
604 | 28 | ->addTag('old_sound_rabbit_mq.logged_channel'); |
|||
605 | 28 | $this->injectConnection($channel, $connectionName); |
|||
606 | |||||
607 | 28 | $this->container->setDefinition($id, $channel); |
|||
608 | |||||
609 | 28 | $this->channelIds[] = $id; |
|||
610 | 28 | $definition->addArgument(new Reference($id)); |
|||
611 | 28 | } |
|||
612 | |||||
613 | 30 | protected function injectConnection(Definition $definition, $connectionName) |
|||
614 | { |
||||
615 | 30 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
|||
616 | 30 | } |
|||
617 | |||||
618 | 31 | public function getAlias() |
|||
619 | { |
||||
620 | 31 | return 'old_sound_rabbit_mq'; |
|||
621 | } |
||||
622 | |||||
623 | /** |
||||
624 | * Add proper dequeuer aware call |
||||
625 | * |
||||
626 | * @param string $callback |
||||
627 | * @param string $name |
||||
628 | */ |
||||
629 | 26 | protected function addDequeuerAwareCall($callback, $name) |
|||
630 | { |
||||
631 | 26 | if (!$this->container->has($callback)) { |
|||
632 | 26 | return; |
|||
633 | } |
||||
634 | |||||
635 | $callbackDefinition = $this->container->findDefinition($callback); |
||||
636 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||||
637 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
||||
638 | $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name))); |
||||
639 | } |
||||
640 | } |
||||
641 | |||||
642 | 1 | private function injectLogger(Definition $definition) |
|||
643 | { |
||||
644 | 1 | $definition->addTag('monolog.logger', array( |
|||
645 | 1 | 'channel' => 'phpamqplib' |
|||
646 | )); |
||||
647 | 1 | $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE))); |
|||
648 | 1 | } |
|||
649 | |||||
650 | /** |
||||
651 | * Get default AMQP exchange options |
||||
652 | * |
||||
653 | * @return array |
||||
654 | */ |
||||
655 | 1 | protected function getDefaultExchangeOptions() |
|||
656 | { |
||||
657 | return array( |
||||
658 | 1 | 'name' => '', |
|||
659 | 'type' => 'direct', |
||||
660 | 'passive' => true, |
||||
661 | 'declare' => false |
||||
662 | ); |
||||
663 | } |
||||
664 | |||||
665 | /** |
||||
666 | * Get default AMQP queue options |
||||
667 | * |
||||
668 | * @return array |
||||
669 | */ |
||||
670 | 27 | protected function getDefaultQueueOptions() |
|||
671 | { |
||||
672 | return array( |
||||
673 | 27 | 'name' => '', |
|||
674 | 'declare' => false |
||||
675 | ); |
||||
676 | } |
||||
677 | } |
||||
678 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.