1 | <?php |
||||||
2 | |||||||
3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||||||
4 | |||||||
5 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||||||
6 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||||||
7 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||||||
8 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||||||
9 | use Symfony\Component\DependencyInjection\Definition; |
||||||
10 | use Symfony\Component\DependencyInjection\Reference; |
||||||
11 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||||||
12 | use Symfony\Component\Config\FileLocator; |
||||||
13 | |||||||
14 | /** |
||||||
15 | * OldSoundRabbitMqExtension. |
||||||
16 | * |
||||||
17 | * @author Alvaro Videla |
||||||
18 | * @author Marc Weistroff <[email protected]> |
||||||
19 | */ |
||||||
20 | class OldSoundRabbitMqExtension extends Extension |
||||||
21 | { |
||||||
22 | /** |
||||||
23 | * @var ContainerBuilder |
||||||
24 | */ |
||||||
25 | private $container; |
||||||
26 | |||||||
27 | /** |
||||||
28 | * @var Boolean Whether the data collector is enabled |
||||||
29 | */ |
||||||
30 | private $collectorEnabled; |
||||||
31 | |||||||
32 | private $channelIds = array(); |
||||||
33 | |||||||
34 | private $config = array(); |
||||||
35 | |||||||
36 | 31 | public function load(array $configs, ContainerBuilder $container) |
|||||
37 | { |
||||||
38 | 31 | $this->container = $container; |
|||||
39 | |||||||
40 | 31 | $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); |
|||||
41 | 31 | $loader->load('rabbitmq.xml'); |
|||||
42 | |||||||
43 | 31 | $configuration = $this->getConfiguration($configs, $container); |
|||||
44 | 31 | $this->config = $this->processConfiguration($configuration, $configs); |
|||||
45 | |||||||
46 | 31 | $this->collectorEnabled = $this->config['enable_collector']; |
|||||
47 | |||||||
48 | 31 | $this->loadConnections(); |
|||||
49 | 31 | $this->loadBindings(); |
|||||
50 | 31 | $this->loadProducers(); |
|||||
51 | 31 | $this->loadConsumers(); |
|||||
52 | 31 | $this->loadMultipleConsumers(); |
|||||
53 | 31 | $this->loadDynamicConsumers(); |
|||||
54 | 31 | $this->loadBatchConsumers(); |
|||||
55 | 31 | $this->loadAnonConsumers(); |
|||||
56 | 31 | $this->loadRpcClients(); |
|||||
57 | 31 | $this->loadRpcServers(); |
|||||
58 | |||||||
59 | 31 | if ($this->collectorEnabled && $this->channelIds) { |
|||||
60 | 28 | $channels = array(); |
|||||
61 | 28 | foreach (array_unique($this->channelIds) as $id) { |
|||||
62 | 28 | $channels[] = new Reference($id); |
|||||
63 | } |
||||||
64 | |||||||
65 | 28 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|||||
66 | 28 | $definition->replaceArgument(0, $channels); |
|||||
67 | } else { |
||||||
68 | 3 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
|||||
69 | } |
||||||
70 | 31 | } |
|||||
71 | |||||||
72 | 31 | public function getConfiguration(array $config, ContainerBuilder $container) |
|||||
73 | { |
||||||
74 | 31 | return new Configuration($this->getAlias()); |
|||||
75 | } |
||||||
76 | |||||||
77 | 31 | protected function loadConnections() |
|||||
78 | { |
||||||
79 | 31 | foreach ($this->config['connections'] as $key => $connection) { |
|||||
80 | 31 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|||||
81 | $classParam = |
||||||
82 | 31 | $connection['lazy'] |
|||||
83 | 22 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
|||||
84 | 31 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
|||||
85 | |||||||
86 | 31 | $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array( |
|||||
87 | 31 | $classParam, $connection, |
|||||
88 | )); |
||||||
89 | 31 | if (isset($connection['connection_parameters_provider'])) { |
|||||
90 | $definition->addArgument(new Reference($connection['connection_parameters_provider'])); |
||||||
91 | unset($connection['connection_parameters_provider']); |
||||||
92 | } |
||||||
93 | 31 | $definition->setPublic(false); |
|||||
94 | 31 | $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key); |
|||||
95 | 31 | $this->container->setDefinition($factoryName, $definition); |
|||||
96 | |||||||
97 | 31 | $definition = new Definition($classParam); |
|||||
98 | 31 | if (method_exists($definition, 'setFactory')) { |
|||||
99 | // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||||
100 | 31 | $definition->setFactory(array(new Reference($factoryName), 'createConnection')); |
|||||
101 | } else { |
||||||
102 | // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||||
103 | $definition->setFactoryService($factoryName); |
||||||
0 ignored issues
–
show
|
|||||||
104 | $definition->setFactoryMethod('createConnection'); |
||||||
0 ignored issues
–
show
The method
setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition . Did you maybe mean setFactory() ?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||||
105 | } |
||||||
106 | 31 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|||||
107 | 31 | $definition->setPublic(true); |
|||||
108 | |||||||
109 | 31 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition); |
|||||
110 | } |
||||||
111 | 31 | } |
|||||
112 | |||||||
113 | 31 | protected function loadBindings() |
|||||
114 | { |
||||||
115 | 31 | if ($this->config['sandbox']) { |
|||||
116 | return; |
||||||
117 | } |
||||||
118 | 31 | foreach ($this->config['bindings'] as $binding) { |
|||||
119 | 22 | ksort($binding); |
|||||
120 | 22 | $definition = new Definition($binding['class']); |
|||||
121 | 22 | $definition->addTag('old_sound_rabbit_mq.binding'); |
|||||
122 | 22 | $definition->addMethodCall('setArguments', array($binding['arguments'])); |
|||||
123 | 22 | $definition->addMethodCall('setDestination', array($binding['destination'])); |
|||||
124 | 22 | $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange'])); |
|||||
125 | 22 | $definition->addMethodCall('setExchange', array($binding['exchange'])); |
|||||
126 | 22 | $definition->addMethodCall('isNowait', array($binding['nowait'])); |
|||||
127 | 22 | $definition->addMethodCall('setRoutingKey', array($binding['routing_key'])); |
|||||
128 | 22 | $this->injectConnection($definition, $binding['connection']); |
|||||
129 | 22 | $key = md5(json_encode($binding)); |
|||||
130 | 22 | if ($this->collectorEnabled) { |
|||||
131 | // in the context of a binding, I don't thing logged channels are needed? |
||||||
132 | 22 | $this->injectLoggedChannel($definition, $key, $binding['connection']); |
|||||
133 | } |
||||||
134 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition); |
|||||
135 | } |
||||||
136 | 31 | } |
|||||
137 | |||||||
138 | 31 | protected function loadProducers() |
|||||
139 | { |
||||||
140 | 31 | if ($this->config['sandbox'] == false) { |
|||||
141 | 31 | foreach ($this->config['producers'] as $key => $producer) { |
|||||
142 | 27 | $definition = new Definition($producer['class']); |
|||||
143 | 27 | $definition->setPublic(true); |
|||||
144 | 27 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||||
145 | 27 | $definition->addTag('old_sound_rabbit_mq.producer'); |
|||||
146 | //this producer doesn't define an exchange -> using AMQP Default |
||||||
147 | 27 | if (!isset($producer['exchange_options'])) { |
|||||
148 | 1 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
|||||
149 | } |
||||||
150 | 27 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options']))); |
|||||
151 | //this producer doesn't define a queue -> using AMQP Default |
||||||
152 | 27 | if (!isset($producer['queue_options'])) { |
|||||
153 | 27 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
|||||
154 | } |
||||||
155 | 27 | $definition->addMethodCall('setQueueOptions', array($producer['queue_options'])); |
|||||
156 | 27 | $this->injectConnection($definition, $producer['connection']); |
|||||
157 | 27 | if ($this->collectorEnabled) { |
|||||
158 | 25 | $this->injectLoggedChannel($definition, $key, $producer['connection']); |
|||||
159 | } |
||||||
160 | 27 | if (!$producer['auto_setup_fabric']) { |
|||||
161 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
162 | } |
||||||
163 | |||||||
164 | 27 | if ($producer['enable_logger']) { |
|||||
165 | $this->injectLogger($definition); |
||||||
166 | } |
||||||
167 | |||||||
168 | 27 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
|||||
169 | |||||||
170 | 27 | $this->container->setDefinition($producerServiceName, $definition); |
|||||
171 | 27 | if (null !== $producer['service_alias']) { |
|||||
172 | 22 | $this->container->setAlias($producer['service_alias'], $producerServiceName); |
|||||
173 | } |
||||||
174 | } |
||||||
175 | } else { |
||||||
176 | foreach ($this->config['producers'] as $key => $producer) { |
||||||
177 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||||||
178 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition); |
||||||
179 | } |
||||||
180 | } |
||||||
181 | 31 | } |
|||||
182 | |||||||
183 | 31 | protected function loadConsumers() |
|||||
184 | { |
||||||
185 | 31 | foreach ($this->config['consumers'] as $key => $consumer) { |
|||||
186 | 26 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); |
|||||
187 | 26 | $definition->setPublic(true); |
|||||
188 | 26 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||||
189 | 26 | $definition->addTag('old_sound_rabbit_mq.consumer'); |
|||||
190 | //this consumer doesn't define an exchange -> using AMQP Default |
||||||
191 | 26 | if (!isset($consumer['exchange_options'])) { |
|||||
192 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||||
193 | } |
||||||
194 | 26 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))); |
|||||
195 | //this consumer doesn't define a queue -> using AMQP Default |
||||||
196 | 26 | if (!isset($consumer['queue_options'])) { |
|||||
197 | $consumer['queue_options'] = $this->getDefaultQueueOptions(); |
||||||
198 | } |
||||||
199 | 26 | $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))); |
|||||
200 | 26 | $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||||
201 | |||||||
202 | 26 | if (array_key_exists('qos_options', $consumer)) { |
|||||
203 | 22 | $definition->addMethodCall('setQosOptions', array( |
|||||
204 | 22 | $consumer['qos_options']['prefetch_size'], |
|||||
205 | 22 | $consumer['qos_options']['prefetch_count'], |
|||||
206 | 22 | $consumer['qos_options']['global'] |
|||||
207 | )); |
||||||
208 | } |
||||||
209 | |||||||
210 | 26 | if (isset($consumer['idle_timeout'])) { |
|||||
211 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
212 | } |
||||||
213 | 26 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
214 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
215 | } |
||||||
216 | 26 | if (isset($consumer['timeout_wait'])) { |
|||||
217 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||||
218 | } |
||||||
219 | 26 | if (isset($consumer['graceful_max_execution'])) { |
|||||
220 | $definition->addMethodCall( |
||||||
221 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
222 | array($consumer['graceful_max_execution']['timeout']) |
||||||
223 | ); |
||||||
224 | $definition->addMethodCall( |
||||||
225 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
226 | array($consumer['graceful_max_execution']['exit_code']) |
||||||
227 | ); |
||||||
228 | } |
||||||
229 | 26 | if (!$consumer['auto_setup_fabric']) { |
|||||
230 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
231 | } |
||||||
232 | |||||||
233 | 26 | $this->injectConnection($definition, $consumer['connection']); |
|||||
234 | 26 | if ($this->collectorEnabled) { |
|||||
235 | 25 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||||
236 | } |
||||||
237 | |||||||
238 | 26 | if ($consumer['enable_logger']) { |
|||||
239 | 1 | $this->injectLogger($definition); |
|||||
240 | } |
||||||
241 | |||||||
242 | 26 | $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); |
|||||
243 | 26 | $this->container->setDefinition($name, $definition); |
|||||
244 | 26 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||||
245 | } |
||||||
246 | 31 | } |
|||||
247 | |||||||
248 | 31 | protected function loadMultipleConsumers() |
|||||
249 | { |
||||||
250 | 31 | foreach ($this->config['multiple_consumers'] as $key => $consumer) { |
|||||
251 | 22 | $queues = array(); |
|||||
252 | 22 | $callbacks = array(); |
|||||
253 | |||||||
254 | 22 | if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { |
|||||
255 | throw new InvalidConfigurationException( |
||||||
256 | "Error on loading $key multiple consumer. " . |
||||||
257 | "Either 'queues' or 'queues_provider' parameters should be defined." |
||||||
258 | ); |
||||||
259 | } |
||||||
260 | |||||||
261 | 22 | foreach ($consumer['queues'] as $queueName => $queueOptions) { |
|||||
262 | 22 | $queues[$queueOptions['name']] = $queueOptions; |
|||||
263 | 22 | $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute'); |
|||||
264 | 22 | $callbacks[] = $queueOptions['callback']; |
|||||
265 | } |
||||||
266 | |||||||
267 | 22 | $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); |
|||||
268 | $definition |
||||||
269 | 22 | ->setPublic(true) |
|||||
270 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
271 | 22 | ->addTag('old_sound_rabbit_mq.multi_consumer') |
|||||
272 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||||
273 | 22 | ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues))); |
|||||
274 | |||||||
275 | 22 | if ($consumer['queues_provider']) { |
|||||
276 | 22 | $definition->addMethodCall( |
|||||
277 | 22 | 'setQueuesProvider', |
|||||
278 | 22 | array(new Reference($consumer['queues_provider'])) |
|||||
279 | ); |
||||||
280 | } |
||||||
281 | |||||||
282 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||||
283 | $definition->addMethodCall('setQosOptions', array( |
||||||
284 | $consumer['qos_options']['prefetch_size'], |
||||||
285 | $consumer['qos_options']['prefetch_count'], |
||||||
286 | $consumer['qos_options']['global'] |
||||||
287 | )); |
||||||
288 | } |
||||||
289 | |||||||
290 | 22 | if (isset($consumer['idle_timeout'])) { |
|||||
291 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
292 | } |
||||||
293 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
294 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
295 | } |
||||||
296 | 22 | if (isset($consumer['timeout_wait'])) { |
|||||
297 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||||
298 | } |
||||||
299 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||||
300 | $definition->addMethodCall( |
||||||
301 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
302 | array($consumer['graceful_max_execution']['timeout']) |
||||||
303 | ); |
||||||
304 | $definition->addMethodCall( |
||||||
305 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
306 | array($consumer['graceful_max_execution']['exit_code']) |
||||||
307 | ); |
||||||
308 | } |
||||||
309 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||||
310 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
311 | } |
||||||
312 | |||||||
313 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||||
314 | 22 | if ($this->collectorEnabled) { |
|||||
315 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||||
316 | } |
||||||
317 | |||||||
318 | 22 | if ($consumer['enable_logger']) { |
|||||
319 | $this->injectLogger($definition); |
||||||
320 | } |
||||||
321 | |||||||
322 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); |
|||||
323 | 22 | $this->container->setDefinition($name, $definition); |
|||||
324 | 22 | if ($consumer['queues_provider']) { |
|||||
325 | 22 | $this->addDequeuerAwareCall($consumer['queues_provider'], $name); |
|||||
326 | } |
||||||
327 | 22 | foreach ($callbacks as $callback) { |
|||||
328 | 22 | $this->addDequeuerAwareCall($callback, $name); |
|||||
329 | } |
||||||
330 | } |
||||||
331 | 31 | } |
|||||
332 | |||||||
333 | 31 | protected function loadDynamicConsumers() |
|||||
334 | { |
||||||
335 | 31 | foreach ($this->config['dynamic_consumers'] as $key => $consumer) { |
|||||
336 | |||||||
337 | 22 | if (empty($consumer['queue_options_provider'])) { |
|||||
338 | throw new InvalidConfigurationException( |
||||||
339 | "Error on loading $key dynamic consumer. " . |
||||||
340 | "'queue_provider' parameter should be defined." |
||||||
341 | ); |
||||||
342 | } |
||||||
343 | |||||||
344 | 22 | $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); |
|||||
345 | $definition |
||||||
346 | 22 | ->setPublic(true) |
|||||
347 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
348 | 22 | ->addTag('old_sound_rabbit_mq.consumer') |
|||||
349 | 22 | ->addTag('old_sound_rabbit_mq.dynamic_consumer') |
|||||
350 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||||
351 | 22 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||||
352 | |||||||
353 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||||
354 | $definition->addMethodCall('setQosOptions', array( |
||||||
355 | $consumer['qos_options']['prefetch_size'], |
||||||
356 | $consumer['qos_options']['prefetch_count'], |
||||||
357 | $consumer['qos_options']['global'] |
||||||
358 | )); |
||||||
359 | } |
||||||
360 | |||||||
361 | 22 | $definition->addMethodCall( |
|||||
362 | 22 | 'setQueueOptionsProvider', |
|||||
363 | 22 | array(new Reference($consumer['queue_options_provider'])) |
|||||
364 | ); |
||||||
365 | |||||||
366 | 22 | if (isset($consumer['idle_timeout'])) { |
|||||
367 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
368 | } |
||||||
369 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
370 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
371 | } |
||||||
372 | 22 | if (isset($consumer['timeout_wait'])) { |
|||||
373 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
||||||
374 | } |
||||||
375 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||||
376 | $definition->addMethodCall( |
||||||
377 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
378 | array($consumer['graceful_max_execution']['timeout']) |
||||||
379 | ); |
||||||
380 | $definition->addMethodCall( |
||||||
381 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
382 | array($consumer['graceful_max_execution']['exit_code']) |
||||||
383 | ); |
||||||
384 | } |
||||||
385 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||||
386 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
387 | } |
||||||
388 | |||||||
389 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||||
390 | 22 | if ($this->collectorEnabled) { |
|||||
391 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||||
392 | } |
||||||
393 | |||||||
394 | 22 | if ($consumer['enable_logger']) { |
|||||
395 | $this->injectLogger($definition); |
||||||
396 | } |
||||||
397 | |||||||
398 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key); |
|||||
399 | 22 | $this->container->setDefinition($name, $definition); |
|||||
400 | 22 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||||
401 | 22 | $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name); |
|||||
402 | } |
||||||
403 | 31 | } |
|||||
404 | |||||||
405 | 31 | protected function loadBatchConsumers() |
|||||
406 | { |
||||||
407 | 31 | foreach ($this->config['batch_consumers'] as $key => $consumer) { |
|||||
408 | $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); |
||||||
409 | |||||||
410 | if (!isset($consumer['exchange_options'])) { |
||||||
411 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||||
412 | } |
||||||
413 | |||||||
414 | $definition |
||||||
415 | ->setPublic(true) |
||||||
416 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||||
417 | ->addTag('old_sound_rabbit_mq.batch_consumer') |
||||||
418 | ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) |
||||||
419 | ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) |
||||||
420 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) |
||||||
421 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
||||||
422 | ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) |
||||||
423 | ->addMethodCall('setQosOptions', array( |
||||||
424 | $consumer['qos_options']['prefetch_size'], |
||||||
425 | $consumer['qos_options']['prefetch_count'], |
||||||
426 | $consumer['qos_options']['global'] |
||||||
427 | )) |
||||||
428 | ; |
||||||
429 | |||||||
430 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||||
431 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
432 | } |
||||||
433 | |||||||
434 | if (isset($consumer['idle_timeout'])) { |
||||||
435 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
436 | } |
||||||
437 | |||||||
438 | if (isset($consumer['graceful_max_execution'])) { |
||||||
439 | $definition->addMethodCall( |
||||||
440 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
441 | array($consumer['graceful_max_execution']['timeout']) |
||||||
442 | ); |
||||||
443 | } |
||||||
444 | |||||||
445 | if (!$consumer['auto_setup_fabric']) { |
||||||
446 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
447 | } |
||||||
448 | |||||||
449 | if ($consumer['keep_alive']) { |
||||||
450 | $definition->addMethodCall('keepAlive'); |
||||||
451 | } |
||||||
452 | |||||||
453 | $this->injectConnection($definition, $consumer['connection']); |
||||||
454 | if ($this->collectorEnabled) { |
||||||
455 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||||
456 | } |
||||||
457 | |||||||
458 | if ($consumer['enable_logger']) { |
||||||
459 | $this->injectLogger($definition); |
||||||
460 | } |
||||||
461 | |||||||
462 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); |
||||||
463 | } |
||||||
464 | 31 | } |
|||||
465 | |||||||
466 | 31 | protected function loadAnonConsumers() |
|||||
467 | { |
||||||
468 | 31 | foreach ($this->config['anon_consumers'] as $key => $anon) { |
|||||
469 | 22 | $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); |
|||||
470 | $definition |
||||||
471 | 22 | ->setPublic(true) |
|||||
472 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
473 | 22 | ->addTag('old_sound_rabbit_mq.anon_consumer') |
|||||
474 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options']))) |
|||||
475 | 22 | ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute'))); |
|||||
476 | 22 | $this->injectConnection($definition, $anon['connection']); |
|||||
477 | 22 | if ($this->collectorEnabled) { |
|||||
478 | 22 | $this->injectLoggedChannel($definition, $key, $anon['connection']); |
|||||
479 | } |
||||||
480 | |||||||
481 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); |
|||||
482 | 22 | $this->container->setDefinition($name, $definition); |
|||||
483 | 22 | $this->addDequeuerAwareCall($anon['callback'], $name); |
|||||
484 | } |
||||||
485 | 31 | } |
|||||
486 | |||||||
487 | /** |
||||||
488 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||||||
489 | * parameter. So we revert the change for right configurations. |
||||||
490 | * |
||||||
491 | * @param array $config |
||||||
492 | * |
||||||
493 | * @return array |
||||||
494 | */ |
||||||
495 | 27 | private function normalizeArgumentKeys(array $config) |
|||||
496 | { |
||||||
497 | 27 | if (isset($config['arguments'])) { |
|||||
498 | 1 | $arguments = $config['arguments']; |
|||||
499 | // support for old configuration |
||||||
500 | 1 | if (is_string($arguments)) { |
|||||
501 | $arguments = $this->argumentsStringAsArray($arguments); |
||||||
502 | } |
||||||
503 | |||||||
504 | 1 | $newArguments = array(); |
|||||
505 | 1 | foreach ($arguments as $key => $value) { |
|||||
506 | 1 | if (strstr($key, '_')) { |
|||||
507 | $key = str_replace('_', '-', $key); |
||||||
508 | } |
||||||
509 | 1 | $newArguments[$key] = $value; |
|||||
510 | } |
||||||
511 | 1 | $config['arguments'] = $newArguments; |
|||||
512 | } |
||||||
513 | 27 | return $config; |
|||||
514 | } |
||||||
515 | |||||||
516 | /** |
||||||
517 | * Support for arguments provided as string. Support for old configuration files. |
||||||
518 | * |
||||||
519 | * @deprecated |
||||||
520 | * @param string $arguments |
||||||
521 | * @return array |
||||||
522 | */ |
||||||
523 | private function argumentsStringAsArray($arguments) |
||||||
524 | { |
||||||
525 | $argumentsArray = array(); |
||||||
526 | |||||||
527 | $argumentPairs = explode(',', $arguments); |
||||||
528 | foreach ($argumentPairs as $argument) { |
||||||
529 | $argumentPair = explode(':', $argument); |
||||||
530 | $type = 'S'; |
||||||
531 | if (isset($argumentPair[2])) { |
||||||
532 | $type = $argumentPair[2]; |
||||||
533 | } |
||||||
534 | $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]); |
||||||
535 | } |
||||||
536 | |||||||
537 | return $argumentsArray; |
||||||
538 | } |
||||||
539 | |||||||
540 | 31 | protected function loadRpcClients() |
|||||
541 | { |
||||||
542 | 31 | foreach ($this->config['rpc_clients'] as $key => $client) { |
|||||
543 | 25 | $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); |
|||||
544 | 25 | $definition->setLazy($client['lazy']); |
|||||
545 | $definition |
||||||
546 | 25 | ->addTag('old_sound_rabbit_mq.rpc_client') |
|||||
547 | 25 | ->addMethodCall('initClient', array($client['expect_serialized_response'])); |
|||||
548 | 25 | $this->injectConnection($definition, $client['connection']); |
|||||
549 | 25 | if ($this->collectorEnabled) { |
|||||
550 | 25 | $this->injectLoggedChannel($definition, $key, $client['connection']); |
|||||
551 | } |
||||||
552 | 25 | if (array_key_exists('unserializer', $client)) { |
|||||
553 | 25 | $definition->addMethodCall('setUnserializer', array($client['unserializer'])); |
|||||
554 | } |
||||||
555 | 25 | if (array_key_exists('direct_reply_to', $client)) { |
|||||
556 | 25 | $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to'])); |
|||||
557 | } |
||||||
558 | 25 | $definition->setPublic(true); |
|||||
559 | |||||||
560 | 25 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition); |
|||||
561 | } |
||||||
562 | 31 | } |
|||||
563 | |||||||
564 | 31 | protected function loadRpcServers() |
|||||
565 | { |
||||||
566 | 31 | foreach ($this->config['rpc_servers'] as $key => $server) { |
|||||
567 | 22 | $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); |
|||||
568 | $definition |
||||||
569 | 22 | ->setPublic(true) |
|||||
570 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
571 | 22 | ->addTag('old_sound_rabbit_mq.rpc_server') |
|||||
572 | 22 | ->addMethodCall('initServer', array($key)) |
|||||
573 | 22 | ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute'))); |
|||||
574 | 22 | $this->injectConnection($definition, $server['connection']); |
|||||
575 | 22 | if ($this->collectorEnabled) { |
|||||
576 | 22 | $this->injectLoggedChannel($definition, $key, $server['connection']); |
|||||
577 | } |
||||||
578 | 22 | if (array_key_exists('qos_options', $server)) { |
|||||
579 | $definition->addMethodCall('setQosOptions', array( |
||||||
580 | $server['qos_options']['prefetch_size'], |
||||||
581 | $server['qos_options']['prefetch_count'], |
||||||
582 | $server['qos_options']['global'] |
||||||
583 | )); |
||||||
584 | } |
||||||
585 | 22 | if (array_key_exists('exchange_options', $server)) { |
|||||
586 | 22 | $definition->addMethodCall('setExchangeOptions', array($server['exchange_options'])); |
|||||
587 | } |
||||||
588 | 22 | if (array_key_exists('queue_options', $server)) { |
|||||
589 | 22 | $definition->addMethodCall('setQueueOptions', array($server['queue_options'])); |
|||||
590 | } |
||||||
591 | 22 | if (array_key_exists('serializer', $server)) { |
|||||
592 | 22 | $definition->addMethodCall('setSerializer', array($server['serializer'])); |
|||||
593 | } |
||||||
594 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition); |
|||||
595 | } |
||||||
596 | 31 | } |
|||||
597 | |||||||
598 | 28 | protected function injectLoggedChannel(Definition $definition, $name, $connectionName) |
|||||
599 | { |
||||||
600 | 28 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
|||||
601 | 28 | $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); |
|||||
602 | $channel |
||||||
603 | 28 | ->setPublic(false) |
|||||
604 | 28 | ->addTag('old_sound_rabbit_mq.logged_channel'); |
|||||
605 | 28 | $this->injectConnection($channel, $connectionName); |
|||||
606 | |||||||
607 | 28 | $this->container->setDefinition($id, $channel); |
|||||
608 | |||||||
609 | 28 | $this->channelIds[] = $id; |
|||||
610 | 28 | $definition->addArgument(new Reference($id)); |
|||||
611 | 28 | } |
|||||
612 | |||||||
613 | 30 | protected function injectConnection(Definition $definition, $connectionName) |
|||||
614 | { |
||||||
615 | 30 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
|||||
616 | 30 | } |
|||||
617 | |||||||
618 | 31 | public function getAlias() |
|||||
619 | { |
||||||
620 | 31 | return 'old_sound_rabbit_mq'; |
|||||
621 | } |
||||||
622 | |||||||
623 | /** |
||||||
624 | * Add proper dequeuer aware call |
||||||
625 | * |
||||||
626 | * @param string $callback |
||||||
627 | * @param string $name |
||||||
628 | */ |
||||||
629 | 26 | protected function addDequeuerAwareCall($callback, $name) |
|||||
630 | { |
||||||
631 | 26 | if (!$this->container->has($callback)) { |
|||||
632 | 26 | return; |
|||||
633 | } |
||||||
634 | |||||||
635 | $callbackDefinition = $this->container->findDefinition($callback); |
||||||
636 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||||||
637 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
||||||
638 | $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name))); |
||||||
639 | } |
||||||
640 | } |
||||||
641 | |||||||
642 | 1 | private function injectLogger(Definition $definition) |
|||||
643 | { |
||||||
644 | 1 | $definition->addTag('monolog.logger', array( |
|||||
645 | 1 | 'channel' => 'phpamqplib' |
|||||
646 | )); |
||||||
647 | 1 | $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE))); |
|||||
648 | 1 | } |
|||||
649 | |||||||
650 | /** |
||||||
651 | * Get default AMQP exchange options |
||||||
652 | * |
||||||
653 | * @return array |
||||||
654 | */ |
||||||
655 | 1 | protected function getDefaultExchangeOptions() |
|||||
656 | { |
||||||
657 | return array( |
||||||
658 | 1 | 'name' => '', |
|||||
659 | 'type' => 'direct', |
||||||
660 | 'passive' => true, |
||||||
661 | 'declare' => false |
||||||
662 | ); |
||||||
663 | } |
||||||
664 | |||||||
665 | /** |
||||||
666 | * Get default AMQP queue options |
||||||
667 | * |
||||||
668 | * @return array |
||||||
669 | */ |
||||||
670 | 27 | protected function getDefaultQueueOptions() |
|||||
671 | { |
||||||
672 | return array( |
||||||
673 | 27 | 'name' => '', |
|||||
674 | 'declare' => false |
||||||
675 | ); |
||||||
676 | } |
||||||
677 | } |
||||||
678 |
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.
This is most likely a typographical error or the method has been renamed.