1 | <?php |
||
2 | |||
3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||
4 | |||
5 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||
6 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||
7 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||
8 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||
9 | use Symfony\Component\DependencyInjection\Definition; |
||
10 | use Symfony\Component\DependencyInjection\Reference; |
||
11 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||
12 | use Symfony\Component\Config\FileLocator; |
||
13 | |||
14 | /** |
||
15 | * OldSoundRabbitMqExtension. |
||
16 | * |
||
17 | * @author Alvaro Videla |
||
18 | * @author Marc Weistroff <[email protected]> |
||
19 | */ |
||
20 | class OldSoundRabbitMqExtension extends Extension |
||
21 | { |
||
22 | /** |
||
23 | * @var ContainerBuilder |
||
24 | */ |
||
25 | private $container; |
||
26 | |||
27 | /** |
||
28 | * @var Boolean Whether the data collector is enabled |
||
29 | */ |
||
30 | private $collectorEnabled; |
||
31 | |||
32 | private $channelIds = array(); |
||
33 | |||
34 | private $config = array(); |
||
35 | |||
36 | 31 | public function load(array $configs, ContainerBuilder $container) |
|
37 | { |
||
38 | 31 | $this->container = $container; |
|
39 | |||
40 | 31 | $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); |
|
41 | 31 | $loader->load('rabbitmq.xml'); |
|
42 | |||
43 | 31 | $configuration = $this->getConfiguration($configs, $container); |
|
44 | 31 | $this->config = $this->processConfiguration($configuration, $configs); |
|
45 | |||
46 | 31 | $this->collectorEnabled = $this->config['enable_collector']; |
|
47 | |||
48 | 31 | $this->loadConnections(); |
|
49 | 31 | $this->loadBindings(); |
|
50 | 31 | $this->loadProducers(); |
|
51 | 31 | $this->loadConsumers(); |
|
52 | 31 | $this->loadMultipleConsumers(); |
|
53 | 31 | $this->loadDynamicConsumers(); |
|
54 | 31 | $this->loadBatchConsumers(); |
|
55 | 31 | $this->loadAnonConsumers(); |
|
56 | 31 | $this->loadRpcClients(); |
|
57 | 31 | $this->loadRpcServers(); |
|
58 | |||
59 | 31 | if ($this->collectorEnabled && $this->channelIds) { |
|
0 ignored issues
–
show
|
|||
60 | 28 | $channels = array(); |
|
61 | 28 | foreach (array_unique($this->channelIds) as $id) { |
|
62 | 28 | $channels[] = new Reference($id); |
|
63 | } |
||
64 | |||
65 | 28 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|
66 | 28 | $definition->replaceArgument(0, $channels); |
|
67 | } else { |
||
68 | 3 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
|
69 | } |
||
70 | 31 | } |
|
71 | |||
72 | 31 | public function getConfiguration(array $config, ContainerBuilder $container) |
|
73 | { |
||
74 | 31 | return new Configuration($this->getAlias()); |
|
75 | } |
||
76 | |||
77 | 31 | protected function loadConnections() |
|
78 | { |
||
79 | 31 | foreach ($this->config['connections'] as $key => $connection) { |
|
80 | 31 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|
81 | $classParam = |
||
82 | 31 | $connection['lazy'] |
|
83 | 22 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
|
84 | 31 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
|
85 | |||
86 | 31 | $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array( |
|
87 | 31 | $classParam, $connection, |
|
88 | )); |
||
89 | 31 | if (isset($connection['connection_parameters_provider'])) { |
|
90 | $definition->addArgument(new Reference($connection['connection_parameters_provider'])); |
||
91 | unset($connection['connection_parameters_provider']); |
||
92 | } |
||
93 | 31 | $definition->setPublic(false); |
|
94 | 31 | $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key); |
|
95 | 31 | $this->container->setDefinition($factoryName, $definition); |
|
96 | |||
97 | 31 | $definition = new Definition($classParam); |
|
98 | 31 | if (method_exists($definition, 'setFactory')) { |
|
99 | // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 |
||
100 | 31 | $definition->setFactory(array(new Reference($factoryName), 'createConnection')); |
|
101 | } else { |
||
102 | // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 |
||
103 | $definition->setFactoryService($factoryName); |
||
104 | $definition->setFactoryMethod('createConnection'); |
||
105 | } |
||
106 | 31 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|
107 | 31 | $definition->setPublic(true); |
|
108 | |||
109 | 31 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition); |
|
110 | } |
||
111 | 31 | } |
|
112 | |||
113 | 31 | protected function loadBindings() |
|
114 | { |
||
115 | 31 | if ($this->config['sandbox']) { |
|
116 | return; |
||
117 | } |
||
118 | 31 | foreach ($this->config['bindings'] as $binding) { |
|
119 | 22 | ksort($binding); |
|
120 | 22 | $definition = new Definition($binding['class']); |
|
121 | 22 | $definition->addTag('old_sound_rabbit_mq.binding'); |
|
122 | 22 | $definition->addMethodCall('setArguments', array($binding['arguments'])); |
|
123 | 22 | $definition->addMethodCall('setDestination', array($binding['destination'])); |
|
124 | 22 | $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange'])); |
|
125 | 22 | $definition->addMethodCall('setExchange', array($binding['exchange'])); |
|
126 | 22 | $definition->addMethodCall('isNowait', array($binding['nowait'])); |
|
127 | 22 | $definition->addMethodCall('setRoutingKey', array($binding['routing_key'])); |
|
128 | 22 | $this->injectConnection($definition, $binding['connection']); |
|
129 | 22 | $key = md5(json_encode($binding)); |
|
130 | 22 | if ($this->collectorEnabled) { |
|
131 | // in the context of a binding, I don't thing logged channels are needed? |
||
132 | 22 | $this->injectLoggedChannel($definition, $key, $binding['connection']); |
|
133 | } |
||
134 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition); |
|
135 | } |
||
136 | 31 | } |
|
137 | |||
138 | 31 | protected function loadProducers() |
|
139 | { |
||
140 | 31 | if ($this->config['sandbox'] == false) { |
|
141 | 31 | foreach ($this->config['producers'] as $key => $producer) { |
|
142 | 27 | $definition = new Definition($producer['class']); |
|
143 | 27 | $definition->setPublic(true); |
|
144 | 27 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|
145 | 27 | $definition->addTag('old_sound_rabbit_mq.producer'); |
|
146 | //this producer doesn't define an exchange -> using AMQP Default |
||
147 | 27 | if (!isset($producer['exchange_options'])) { |
|
148 | 1 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
|
149 | } |
||
150 | 27 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options']))); |
|
151 | //this producer doesn't define a queue -> using AMQP Default |
||
152 | 27 | if (!isset($producer['queue_options'])) { |
|
153 | 27 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
|
154 | } |
||
155 | 27 | $definition->addMethodCall('setQueueOptions', array($producer['queue_options'])); |
|
156 | 27 | $this->injectConnection($definition, $producer['connection']); |
|
157 | 27 | if ($this->collectorEnabled) { |
|
158 | 25 | $this->injectLoggedChannel($definition, $key, $producer['connection']); |
|
159 | } |
||
160 | 27 | if (!$producer['auto_setup_fabric']) { |
|
161 | $definition->addMethodCall('disableAutoSetupFabric'); |
||
162 | } |
||
163 | |||
164 | 27 | if ($producer['enable_logger']) { |
|
165 | $this->injectLogger($definition); |
||
166 | } |
||
167 | |||
168 | 27 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
|
169 | |||
170 | 27 | $this->container->setDefinition($producerServiceName, $definition); |
|
171 | 27 | if (null !== $producer['service_alias']) { |
|
172 | 22 | $this->container->setAlias($producer['service_alias'], $producerServiceName); |
|
173 | } |
||
174 | } |
||
175 | } else { |
||
176 | foreach ($this->config['producers'] as $key => $producer) { |
||
177 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||
178 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition); |
||
179 | } |
||
180 | } |
||
181 | 31 | } |
|
182 | |||
183 | 31 | protected function loadConsumers() |
|
184 | { |
||
185 | 31 | foreach ($this->config['consumers'] as $key => $consumer) { |
|
186 | 26 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); |
|
187 | 26 | $definition->setPublic(true); |
|
188 | 26 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|
189 | 26 | $definition->addTag('old_sound_rabbit_mq.consumer'); |
|
190 | //this consumer doesn't define an exchange -> using AMQP Default |
||
191 | 26 | if (!isset($consumer['exchange_options'])) { |
|
192 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||
193 | } |
||
194 | 26 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))); |
|
195 | //this consumer doesn't define a queue -> using AMQP Default |
||
196 | 26 | if (!isset($consumer['queue_options'])) { |
|
197 | $consumer['queue_options'] = $this->getDefaultQueueOptions(); |
||
198 | } |
||
199 | 26 | $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))); |
|
200 | 26 | $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|
201 | |||
202 | 26 | if (array_key_exists('qos_options', $consumer)) { |
|
203 | 22 | $definition->addMethodCall('setQosOptions', array( |
|
204 | 22 | $consumer['qos_options']['prefetch_size'], |
|
205 | 22 | $consumer['qos_options']['prefetch_count'], |
|
206 | 22 | $consumer['qos_options']['global'] |
|
207 | )); |
||
208 | } |
||
209 | |||
210 | 26 | if (isset($consumer['idle_timeout'])) { |
|
211 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||
212 | } |
||
213 | 26 | if (isset($consumer['idle_timeout_exit_code'])) { |
|
214 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||
215 | } |
||
216 | 26 | if (isset($consumer['timeout_wait'])) { |
|
217 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|
218 | } |
||
219 | 26 | if (isset($consumer['graceful_max_execution'])) { |
|
220 | $definition->addMethodCall( |
||
221 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||
222 | array($consumer['graceful_max_execution']['timeout']) |
||
223 | ); |
||
224 | $definition->addMethodCall( |
||
225 | 'setGracefulMaxExecutionTimeoutExitCode', |
||
226 | array($consumer['graceful_max_execution']['exit_code']) |
||
227 | ); |
||
228 | } |
||
229 | 26 | if (!$consumer['auto_setup_fabric']) { |
|
230 | $definition->addMethodCall('disableAutoSetupFabric'); |
||
231 | } |
||
232 | |||
233 | 26 | $this->injectConnection($definition, $consumer['connection']); |
|
234 | 26 | if ($this->collectorEnabled) { |
|
235 | 25 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|
236 | } |
||
237 | |||
238 | 26 | if ($consumer['enable_logger']) { |
|
239 | 1 | $this->injectLogger($definition); |
|
240 | } |
||
241 | |||
242 | 26 | $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); |
|
243 | 26 | $this->container->setDefinition($name, $definition); |
|
244 | 26 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|
245 | } |
||
246 | 31 | } |
|
247 | |||
248 | 31 | protected function loadMultipleConsumers() |
|
249 | { |
||
250 | 31 | foreach ($this->config['multiple_consumers'] as $key => $consumer) { |
|
251 | 22 | $queues = array(); |
|
252 | 22 | $callbacks = array(); |
|
253 | |||
254 | 22 | if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { |
|
255 | throw new InvalidConfigurationException( |
||
256 | "Error on loading $key multiple consumer. " . |
||
257 | "Either 'queues' or 'queues_provider' parameters should be defined." |
||
258 | ); |
||
259 | } |
||
260 | |||
261 | 22 | foreach ($consumer['queues'] as $queueName => $queueOptions) { |
|
262 | 22 | $queues[$queueOptions['name']] = $queueOptions; |
|
263 | 22 | $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute'); |
|
264 | 22 | $callbacks[] = $queueOptions['callback']; |
|
265 | } |
||
266 | |||
267 | 22 | $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); |
|
268 | $definition |
||
269 | 22 | ->setPublic(true) |
|
270 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|
271 | 22 | ->addTag('old_sound_rabbit_mq.multi_consumer') |
|
272 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|
273 | 22 | ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues))); |
|
274 | |||
275 | 22 | if ($consumer['queues_provider']) { |
|
276 | 22 | $definition->addMethodCall( |
|
277 | 22 | 'setQueuesProvider', |
|
278 | 22 | array(new Reference($consumer['queues_provider'])) |
|
279 | ); |
||
280 | } |
||
281 | |||
282 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|
283 | $definition->addMethodCall('setQosOptions', array( |
||
284 | $consumer['qos_options']['prefetch_size'], |
||
285 | $consumer['qos_options']['prefetch_count'], |
||
286 | $consumer['qos_options']['global'] |
||
287 | )); |
||
288 | } |
||
289 | |||
290 | 22 | if (isset($consumer['idle_timeout'])) { |
|
291 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||
292 | } |
||
293 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|
294 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||
295 | } |
||
296 | 22 | if (isset($consumer['timeout_wait'])) { |
|
297 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|
298 | } |
||
299 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|
300 | $definition->addMethodCall( |
||
301 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||
302 | array($consumer['graceful_max_execution']['timeout']) |
||
303 | ); |
||
304 | $definition->addMethodCall( |
||
305 | 'setGracefulMaxExecutionTimeoutExitCode', |
||
306 | array($consumer['graceful_max_execution']['exit_code']) |
||
307 | ); |
||
308 | } |
||
309 | 22 | if (!$consumer['auto_setup_fabric']) { |
|
310 | $definition->addMethodCall('disableAutoSetupFabric'); |
||
311 | } |
||
312 | |||
313 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|
314 | 22 | if ($this->collectorEnabled) { |
|
315 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|
316 | } |
||
317 | |||
318 | 22 | if ($consumer['enable_logger']) { |
|
319 | $this->injectLogger($definition); |
||
320 | } |
||
321 | |||
322 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); |
|
323 | 22 | $this->container->setDefinition($name, $definition); |
|
324 | 22 | if ($consumer['queues_provider']) { |
|
325 | 22 | $this->addDequeuerAwareCall($consumer['queues_provider'], $name); |
|
326 | } |
||
327 | 22 | foreach ($callbacks as $callback) { |
|
328 | 22 | $this->addDequeuerAwareCall($callback, $name); |
|
329 | } |
||
330 | } |
||
331 | 31 | } |
|
332 | |||
333 | 31 | protected function loadDynamicConsumers() |
|
334 | { |
||
335 | 31 | foreach ($this->config['dynamic_consumers'] as $key => $consumer) { |
|
336 | |||
337 | 22 | if (empty($consumer['queue_options_provider'])) { |
|
338 | throw new InvalidConfigurationException( |
||
339 | "Error on loading $key dynamic consumer. " . |
||
340 | "'queue_provider' parameter should be defined." |
||
341 | ); |
||
342 | } |
||
343 | |||
344 | 22 | $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); |
|
345 | $definition |
||
346 | 22 | ->setPublic(true) |
|
347 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|
348 | 22 | ->addTag('old_sound_rabbit_mq.consumer') |
|
349 | 22 | ->addTag('old_sound_rabbit_mq.dynamic_consumer') |
|
350 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|
351 | 22 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|
352 | |||
353 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|
354 | $definition->addMethodCall('setQosOptions', array( |
||
355 | $consumer['qos_options']['prefetch_size'], |
||
356 | $consumer['qos_options']['prefetch_count'], |
||
357 | $consumer['qos_options']['global'] |
||
358 | )); |
||
359 | } |
||
360 | |||
361 | 22 | $definition->addMethodCall( |
|
362 | 22 | 'setQueueOptionsProvider', |
|
363 | 22 | array(new Reference($consumer['queue_options_provider'])) |
|
364 | ); |
||
365 | |||
366 | 22 | if (isset($consumer['idle_timeout'])) { |
|
367 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||
368 | } |
||
369 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|
370 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||
371 | } |
||
372 | 22 | if (isset($consumer['timeout_wait'])) { |
|
373 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
||
374 | } |
||
375 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|
376 | $definition->addMethodCall( |
||
377 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||
378 | array($consumer['graceful_max_execution']['timeout']) |
||
379 | ); |
||
380 | $definition->addMethodCall( |
||
381 | 'setGracefulMaxExecutionTimeoutExitCode', |
||
382 | array($consumer['graceful_max_execution']['exit_code']) |
||
383 | ); |
||
384 | } |
||
385 | 22 | if (!$consumer['auto_setup_fabric']) { |
|
386 | $definition->addMethodCall('disableAutoSetupFabric'); |
||
387 | } |
||
388 | |||
389 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|
390 | 22 | if ($this->collectorEnabled) { |
|
391 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|
392 | } |
||
393 | |||
394 | 22 | if ($consumer['enable_logger']) { |
|
395 | $this->injectLogger($definition); |
||
396 | } |
||
397 | |||
398 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key); |
|
399 | 22 | $this->container->setDefinition($name, $definition); |
|
400 | 22 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|
401 | 22 | $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name); |
|
402 | } |
||
403 | 31 | } |
|
404 | |||
405 | 31 | protected function loadBatchConsumers() |
|
406 | { |
||
407 | 31 | foreach ($this->config['batch_consumers'] as $key => $consumer) { |
|
408 | $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); |
||
409 | |||
410 | if (!isset($consumer['exchange_options'])) { |
||
411 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||
412 | } |
||
413 | |||
414 | $definition |
||
415 | ->setPublic(true) |
||
416 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||
417 | ->addTag('old_sound_rabbit_mq.batch_consumer') |
||
418 | ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) |
||
419 | ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) |
||
420 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) |
||
421 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
||
422 | ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) |
||
423 | ->addMethodCall('setQosOptions', array( |
||
424 | $consumer['qos_options']['prefetch_size'], |
||
425 | $consumer['qos_options']['prefetch_count'], |
||
426 | $consumer['qos_options']['global'] |
||
427 | )) |
||
428 | ; |
||
429 | |||
430 | if (isset($consumer['idle_timeout_exit_code'])) { |
||
431 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||
432 | } |
||
433 | |||
434 | if (isset($consumer['idle_timeout'])) { |
||
435 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||
436 | } |
||
437 | |||
438 | if (isset($consumer['graceful_max_execution'])) { |
||
439 | $definition->addMethodCall( |
||
440 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||
441 | array($consumer['graceful_max_execution']['timeout']) |
||
442 | ); |
||
443 | } |
||
444 | |||
445 | if (!$consumer['auto_setup_fabric']) { |
||
446 | $definition->addMethodCall('disableAutoSetupFabric'); |
||
447 | } |
||
448 | |||
449 | if ($consumer['keep_alive']) { |
||
450 | $definition->addMethodCall('keepAlive'); |
||
451 | } |
||
452 | |||
453 | $this->injectConnection($definition, $consumer['connection']); |
||
454 | if ($this->collectorEnabled) { |
||
455 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||
456 | } |
||
457 | |||
458 | if ($consumer['enable_logger']) { |
||
459 | $this->injectLogger($definition); |
||
460 | } |
||
461 | |||
462 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); |
||
463 | } |
||
464 | 31 | } |
|
465 | |||
466 | 31 | protected function loadAnonConsumers() |
|
467 | { |
||
468 | 31 | foreach ($this->config['anon_consumers'] as $key => $anon) { |
|
469 | 22 | $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); |
|
470 | $definition |
||
471 | 22 | ->setPublic(true) |
|
472 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|
473 | 22 | ->addTag('old_sound_rabbit_mq.anon_consumer') |
|
474 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options']))) |
|
475 | 22 | ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute'))); |
|
476 | 22 | $this->injectConnection($definition, $anon['connection']); |
|
477 | 22 | if ($this->collectorEnabled) { |
|
478 | 22 | $this->injectLoggedChannel($definition, $key, $anon['connection']); |
|
479 | } |
||
480 | |||
481 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); |
|
482 | 22 | $this->container->setDefinition($name, $definition); |
|
483 | 22 | $this->addDequeuerAwareCall($anon['callback'], $name); |
|
484 | } |
||
485 | 31 | } |
|
486 | |||
487 | /** |
||
488 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||
489 | * parameter. So we revert the change for right configurations. |
||
490 | * |
||
491 | * @param array $config |
||
492 | * |
||
493 | * @return array |
||
494 | */ |
||
495 | 27 | private function normalizeArgumentKeys(array $config) |
|
496 | { |
||
497 | 27 | if (isset($config['arguments'])) { |
|
498 | 1 | $arguments = $config['arguments']; |
|
499 | // support for old configuration |
||
500 | 1 | if (is_string($arguments)) { |
|
501 | $arguments = $this->argumentsStringAsArray($arguments); |
||
502 | } |
||
503 | |||
504 | 1 | $newArguments = array(); |
|
505 | 1 | foreach ($arguments as $key => $value) { |
|
506 | 1 | if (strstr($key, '_')) { |
|
507 | $key = str_replace('_', '-', $key); |
||
508 | } |
||
509 | 1 | $newArguments[$key] = $value; |
|
510 | } |
||
511 | 1 | $config['arguments'] = $newArguments; |
|
512 | } |
||
513 | 27 | return $config; |
|
514 | } |
||
515 | |||
516 | /** |
||
517 | * Support for arguments provided as string. Support for old configuration files. |
||
518 | * |
||
519 | * @deprecated |
||
520 | * @param string $arguments |
||
521 | * @return array |
||
522 | */ |
||
523 | private function argumentsStringAsArray($arguments) |
||
524 | { |
||
525 | $argumentsArray = array(); |
||
526 | |||
527 | $argumentPairs = explode(',', $arguments); |
||
528 | foreach ($argumentPairs as $argument) { |
||
529 | $argumentPair = explode(':', $argument); |
||
530 | $type = 'S'; |
||
531 | if (isset($argumentPair[2])) { |
||
532 | $type = $argumentPair[2]; |
||
533 | } |
||
534 | $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]); |
||
535 | } |
||
536 | |||
537 | return $argumentsArray; |
||
538 | } |
||
539 | |||
540 | 31 | protected function loadRpcClients() |
|
541 | { |
||
542 | 31 | foreach ($this->config['rpc_clients'] as $key => $client) { |
|
543 | 25 | $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); |
|
544 | 25 | $definition->setLazy($client['lazy']); |
|
545 | $definition |
||
546 | 25 | ->addTag('old_sound_rabbit_mq.rpc_client') |
|
547 | 25 | ->addMethodCall('initClient', array($client['expect_serialized_response'])); |
|
548 | 25 | $this->injectConnection($definition, $client['connection']); |
|
549 | 25 | if ($this->collectorEnabled) { |
|
550 | 25 | $this->injectLoggedChannel($definition, $key, $client['connection']); |
|
551 | } |
||
552 | 25 | if (array_key_exists('unserializer', $client)) { |
|
553 | 25 | $definition->addMethodCall('setUnserializer', array($client['unserializer'])); |
|
554 | } |
||
555 | 25 | if (array_key_exists('direct_reply_to', $client)) { |
|
556 | 25 | $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to'])); |
|
557 | } |
||
558 | 25 | $definition->setPublic(true); |
|
559 | |||
560 | 25 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition); |
|
561 | } |
||
562 | 31 | } |
|
563 | |||
564 | 31 | protected function loadRpcServers() |
|
565 | { |
||
566 | 31 | foreach ($this->config['rpc_servers'] as $key => $server) { |
|
567 | 22 | $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); |
|
568 | $definition |
||
569 | 22 | ->setPublic(true) |
|
570 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|
571 | 22 | ->addTag('old_sound_rabbit_mq.rpc_server') |
|
572 | 22 | ->addMethodCall('initServer', array($key)) |
|
573 | 22 | ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute'))); |
|
574 | 22 | $this->injectConnection($definition, $server['connection']); |
|
575 | 22 | if ($this->collectorEnabled) { |
|
576 | 22 | $this->injectLoggedChannel($definition, $key, $server['connection']); |
|
577 | } |
||
578 | 22 | if (array_key_exists('qos_options', $server)) { |
|
579 | $definition->addMethodCall('setQosOptions', array( |
||
580 | $server['qos_options']['prefetch_size'], |
||
581 | $server['qos_options']['prefetch_count'], |
||
582 | $server['qos_options']['global'] |
||
583 | )); |
||
584 | } |
||
585 | 22 | if (array_key_exists('exchange_options', $server)) { |
|
586 | 22 | $definition->addMethodCall('setExchangeOptions', array($server['exchange_options'])); |
|
587 | } |
||
588 | 22 | if (array_key_exists('queue_options', $server)) { |
|
589 | 22 | $definition->addMethodCall('setQueueOptions', array($server['queue_options'])); |
|
590 | } |
||
591 | 22 | if (array_key_exists('serializer', $server)) { |
|
592 | 22 | $definition->addMethodCall('setSerializer', array($server['serializer'])); |
|
593 | } |
||
594 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition); |
|
595 | } |
||
596 | 31 | } |
|
597 | |||
598 | 28 | protected function injectLoggedChannel(Definition $definition, $name, $connectionName) |
|
599 | { |
||
600 | 28 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
|
601 | 28 | $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); |
|
602 | $channel |
||
603 | 28 | ->setPublic(false) |
|
604 | 28 | ->addTag('old_sound_rabbit_mq.logged_channel'); |
|
605 | 28 | $this->injectConnection($channel, $connectionName); |
|
606 | |||
607 | 28 | $this->container->setDefinition($id, $channel); |
|
608 | |||
609 | 28 | $this->channelIds[] = $id; |
|
610 | 28 | $definition->addArgument(new Reference($id)); |
|
611 | 28 | } |
|
612 | |||
613 | 30 | protected function injectConnection(Definition $definition, $connectionName) |
|
614 | { |
||
615 | 30 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
|
616 | 30 | } |
|
617 | |||
618 | 31 | public function getAlias() |
|
619 | { |
||
620 | 31 | return 'old_sound_rabbit_mq'; |
|
621 | } |
||
622 | |||
623 | /** |
||
624 | * Add proper dequeuer aware call |
||
625 | * |
||
626 | * @param string $callback |
||
627 | * @param string $name |
||
628 | */ |
||
629 | 26 | protected function addDequeuerAwareCall($callback, $name) |
|
630 | { |
||
631 | 26 | if (!$this->container->has($callback)) { |
|
632 | 26 | return; |
|
633 | } |
||
634 | |||
635 | $callbackDefinition = $this->container->findDefinition($callback); |
||
636 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||
637 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
||
638 | $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name))); |
||
639 | } |
||
640 | } |
||
641 | |||
642 | 1 | private function injectLogger(Definition $definition) |
|
643 | { |
||
644 | 1 | $definition->addTag('monolog.logger', array( |
|
645 | 1 | 'channel' => 'phpamqplib' |
|
646 | )); |
||
647 | 1 | $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE))); |
|
648 | 1 | } |
|
649 | |||
650 | /** |
||
651 | * Get default AMQP exchange options |
||
652 | * |
||
653 | * @return array |
||
654 | */ |
||
655 | 1 | protected function getDefaultExchangeOptions() |
|
656 | { |
||
657 | return array( |
||
658 | 1 | 'name' => '', |
|
659 | 'type' => 'direct', |
||
660 | 'passive' => true, |
||
661 | 'declare' => false |
||
662 | ); |
||
663 | } |
||
664 | |||
665 | /** |
||
666 | * Get default AMQP queue options |
||
667 | * |
||
668 | * @return array |
||
669 | */ |
||
670 | 27 | protected function getDefaultQueueOptions() |
|
671 | { |
||
672 | return array( |
||
673 | 27 | 'name' => '', |
|
674 | 'declare' => false |
||
675 | ); |
||
676 | } |
||
677 | } |
||
678 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.