eMAGTechLabs /
RabbitMqBundle
| 1 | <?php |
||||||
| 2 | |||||||
| 3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||||||
| 4 | |||||||
| 5 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||||||
| 6 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||||||
| 7 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||||||
| 8 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||||||
| 9 | use Symfony\Component\DependencyInjection\Definition; |
||||||
| 10 | use Symfony\Component\DependencyInjection\Reference; |
||||||
| 11 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||||||
| 12 | use Symfony\Component\Config\FileLocator; |
||||||
| 13 | |||||||
| 14 | /** |
||||||
| 15 | * OldSoundRabbitMqExtension. |
||||||
| 16 | * |
||||||
| 17 | * @author Alvaro Videla |
||||||
| 18 | * @author Marc Weistroff <[email protected]> |
||||||
| 19 | */ |
||||||
| 20 | class OldSoundRabbitMqExtension extends Extension |
||||||
| 21 | { |
||||||
| 22 | /** |
||||||
| 23 | * @var ContainerBuilder |
||||||
| 24 | */ |
||||||
| 25 | private $container; |
||||||
| 26 | |||||||
| 27 | /** |
||||||
| 28 | * @var Boolean Whether the data collector is enabled |
||||||
| 29 | */ |
||||||
| 30 | private $collectorEnabled; |
||||||
| 31 | |||||||
| 32 | private $channelIds = array(); |
||||||
| 33 | |||||||
| 34 | private $config = array(); |
||||||
| 35 | |||||||
| 36 | 31 | public function load(array $configs, ContainerBuilder $container) |
|||||
| 37 | { |
||||||
| 38 | 31 | $this->container = $container; |
|||||
| 39 | |||||||
| 40 | 31 | $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); |
|||||
| 41 | 31 | $loader->load('rabbitmq.xml'); |
|||||
| 42 | |||||||
| 43 | 31 | $configuration = $this->getConfiguration($configs, $container); |
|||||
| 44 | 31 | $this->config = $this->processConfiguration($configuration, $configs); |
|||||
| 45 | |||||||
| 46 | 31 | $this->collectorEnabled = $this->config['enable_collector']; |
|||||
| 47 | |||||||
| 48 | 31 | $this->loadConnections(); |
|||||
| 49 | 31 | $this->loadBindings(); |
|||||
| 50 | 31 | $this->loadProducers(); |
|||||
| 51 | 31 | $this->loadConsumers(); |
|||||
| 52 | 31 | $this->loadMultipleConsumers(); |
|||||
| 53 | 31 | $this->loadDynamicConsumers(); |
|||||
| 54 | 31 | $this->loadBatchConsumers(); |
|||||
| 55 | 31 | $this->loadAnonConsumers(); |
|||||
| 56 | 31 | $this->loadRpcClients(); |
|||||
| 57 | 31 | $this->loadRpcServers(); |
|||||
| 58 | |||||||
| 59 | 31 | if ($this->collectorEnabled && $this->channelIds) { |
|||||
|
0 ignored issues
–
show
|
|||||||
| 60 | 28 | $channels = array(); |
|||||
| 61 | 28 | foreach (array_unique($this->channelIds) as $id) { |
|||||
| 62 | 28 | $channels[] = new Reference($id); |
|||||
| 63 | } |
||||||
| 64 | |||||||
| 65 | 28 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|||||
| 66 | 28 | $definition->replaceArgument(0, $channels); |
|||||
| 67 | } else { |
||||||
| 68 | 3 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
|||||
| 69 | } |
||||||
| 70 | 31 | } |
|||||
| 71 | |||||||
| 72 | 31 | public function getConfiguration(array $config, ContainerBuilder $container) |
|||||
| 73 | { |
||||||
| 74 | 31 | return new Configuration($this->getAlias()); |
|||||
| 75 | } |
||||||
| 76 | |||||||
| 77 | 31 | protected function loadConnections() |
|||||
| 78 | { |
||||||
| 79 | 31 | foreach ($this->config['connections'] as $key => $connection) { |
|||||
| 80 | 31 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|||||
| 81 | $classParam = |
||||||
| 82 | 31 | $connection['lazy'] |
|||||
| 83 | 22 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
|||||
| 84 | 31 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
|||||
| 85 | |||||||
| 86 | 31 | $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array( |
|||||
| 87 | 31 | $classParam, $connection, |
|||||
| 88 | )); |
||||||
| 89 | 31 | if (isset($connection['connection_parameters_provider'])) { |
|||||
| 90 | $definition->addArgument(new Reference($connection['connection_parameters_provider'])); |
||||||
| 91 | unset($connection['connection_parameters_provider']); |
||||||
| 92 | } |
||||||
| 93 | 31 | $definition->setPublic(false); |
|||||
| 94 | 31 | $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key); |
|||||
| 95 | 31 | $this->container->setDefinition($factoryName, $definition); |
|||||
| 96 | |||||||
| 97 | 31 | $definition = new Definition($classParam); |
|||||
| 98 | 31 | if (method_exists($definition, 'setFactory')) { |
|||||
| 99 | // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||||
| 100 | 31 | $definition->setFactory(array(new Reference($factoryName), 'createConnection')); |
|||||
| 101 | } else { |
||||||
| 102 | // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||||
| 103 | $definition->setFactoryService($factoryName); |
||||||
|
0 ignored issues
–
show
The method
setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. Loading history...
|
|||||||
| 104 | $definition->setFactoryMethod('createConnection'); |
||||||
|
0 ignored issues
–
show
The method
setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. Loading history...
|
|||||||
| 105 | } |
||||||
| 106 | 31 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|||||
| 107 | 31 | $definition->setPublic(true); |
|||||
| 108 | |||||||
| 109 | 31 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition); |
|||||
| 110 | } |
||||||
| 111 | 31 | } |
|||||
| 112 | |||||||
| 113 | 31 | protected function loadBindings() |
|||||
| 114 | { |
||||||
| 115 | 31 | if ($this->config['sandbox']) { |
|||||
| 116 | return; |
||||||
| 117 | } |
||||||
| 118 | 31 | foreach ($this->config['bindings'] as $binding) { |
|||||
| 119 | 22 | ksort($binding); |
|||||
| 120 | 22 | $definition = new Definition($binding['class']); |
|||||
| 121 | 22 | $definition->addTag('old_sound_rabbit_mq.binding'); |
|||||
| 122 | 22 | $definition->addMethodCall('setArguments', array($binding['arguments'])); |
|||||
| 123 | 22 | $definition->addMethodCall('setDestination', array($binding['destination'])); |
|||||
| 124 | 22 | $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange'])); |
|||||
| 125 | 22 | $definition->addMethodCall('setExchange', array($binding['exchange'])); |
|||||
| 126 | 22 | $definition->addMethodCall('isNowait', array($binding['nowait'])); |
|||||
| 127 | 22 | $definition->addMethodCall('setRoutingKey', array($binding['routing_key'])); |
|||||
| 128 | 22 | $this->injectConnection($definition, $binding['connection']); |
|||||
| 129 | 22 | $key = md5(json_encode($binding)); |
|||||
| 130 | 22 | if ($this->collectorEnabled) { |
|||||
| 131 | // in the context of a binding, I don't thing logged channels are needed? |
||||||
| 132 | 22 | $this->injectLoggedChannel($definition, $key, $binding['connection']); |
|||||
| 133 | } |
||||||
| 134 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition); |
|||||
| 135 | } |
||||||
| 136 | 31 | } |
|||||
| 137 | |||||||
| 138 | 31 | protected function loadProducers() |
|||||
| 139 | { |
||||||
| 140 | 31 | if ($this->config['sandbox'] == false) { |
|||||
| 141 | 31 | foreach ($this->config['producers'] as $key => $producer) { |
|||||
| 142 | 27 | $definition = new Definition($producer['class']); |
|||||
| 143 | 27 | $definition->setPublic(true); |
|||||
| 144 | 27 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||||
| 145 | 27 | $definition->addTag('old_sound_rabbit_mq.producer'); |
|||||
| 146 | //this producer doesn't define an exchange -> using AMQP Default |
||||||
| 147 | 27 | if (!isset($producer['exchange_options'])) { |
|||||
| 148 | 1 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
|||||
| 149 | } |
||||||
| 150 | 27 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options']))); |
|||||
| 151 | //this producer doesn't define a queue -> using AMQP Default |
||||||
| 152 | 27 | if (!isset($producer['queue_options'])) { |
|||||
| 153 | 27 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
|||||
| 154 | } |
||||||
| 155 | 27 | $definition->addMethodCall('setQueueOptions', array($producer['queue_options'])); |
|||||
| 156 | 27 | $this->injectConnection($definition, $producer['connection']); |
|||||
| 157 | 27 | if ($this->collectorEnabled) { |
|||||
| 158 | 25 | $this->injectLoggedChannel($definition, $key, $producer['connection']); |
|||||
| 159 | } |
||||||
| 160 | 27 | if (!$producer['auto_setup_fabric']) { |
|||||
| 161 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
| 162 | } |
||||||
| 163 | |||||||
| 164 | 27 | if ($producer['enable_logger']) { |
|||||
| 165 | $this->injectLogger($definition); |
||||||
| 166 | } |
||||||
| 167 | |||||||
| 168 | 27 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
|||||
| 169 | |||||||
| 170 | 27 | $this->container->setDefinition($producerServiceName, $definition); |
|||||
| 171 | 27 | if (null !== $producer['service_alias']) { |
|||||
| 172 | 22 | $this->container->setAlias($producer['service_alias'], $producerServiceName); |
|||||
| 173 | } |
||||||
| 174 | } |
||||||
| 175 | } else { |
||||||
| 176 | foreach ($this->config['producers'] as $key => $producer) { |
||||||
| 177 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||||||
| 178 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition); |
||||||
| 179 | } |
||||||
| 180 | } |
||||||
| 181 | 31 | } |
|||||
| 182 | |||||||
| 183 | 31 | protected function loadConsumers() |
|||||
| 184 | { |
||||||
| 185 | 31 | foreach ($this->config['consumers'] as $key => $consumer) { |
|||||
| 186 | 26 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); |
|||||
| 187 | 26 | $definition->setPublic(true); |
|||||
| 188 | 26 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||||
| 189 | 26 | $definition->addTag('old_sound_rabbit_mq.consumer'); |
|||||
| 190 | //this consumer doesn't define an exchange -> using AMQP Default |
||||||
| 191 | 26 | if (!isset($consumer['exchange_options'])) { |
|||||
| 192 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||||
| 193 | } |
||||||
| 194 | 26 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))); |
|||||
| 195 | //this consumer doesn't define a queue -> using AMQP Default |
||||||
| 196 | 26 | if (!isset($consumer['queue_options'])) { |
|||||
| 197 | $consumer['queue_options'] = $this->getDefaultQueueOptions(); |
||||||
| 198 | } |
||||||
| 199 | 26 | $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))); |
|||||
| 200 | 26 | $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||||
| 201 | |||||||
| 202 | 26 | if (array_key_exists('qos_options', $consumer)) { |
|||||
| 203 | 22 | $definition->addMethodCall('setQosOptions', array( |
|||||
| 204 | 22 | $consumer['qos_options']['prefetch_size'], |
|||||
| 205 | 22 | $consumer['qos_options']['prefetch_count'], |
|||||
| 206 | 22 | $consumer['qos_options']['global'] |
|||||
| 207 | )); |
||||||
| 208 | } |
||||||
| 209 | |||||||
| 210 | 26 | if (isset($consumer['idle_timeout'])) { |
|||||
| 211 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
| 212 | } |
||||||
| 213 | 26 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
| 214 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
| 215 | } |
||||||
| 216 | 26 | if (isset($consumer['timeout_wait'])) { |
|||||
| 217 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||||
| 218 | } |
||||||
| 219 | 26 | if (isset($consumer['graceful_max_execution'])) { |
|||||
| 220 | $definition->addMethodCall( |
||||||
| 221 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
| 222 | array($consumer['graceful_max_execution']['timeout']) |
||||||
| 223 | ); |
||||||
| 224 | $definition->addMethodCall( |
||||||
| 225 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
| 226 | array($consumer['graceful_max_execution']['exit_code']) |
||||||
| 227 | ); |
||||||
| 228 | } |
||||||
| 229 | 26 | if (!$consumer['auto_setup_fabric']) { |
|||||
| 230 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
| 231 | } |
||||||
| 232 | |||||||
| 233 | 26 | $this->injectConnection($definition, $consumer['connection']); |
|||||
| 234 | 26 | if ($this->collectorEnabled) { |
|||||
| 235 | 25 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||||
| 236 | } |
||||||
| 237 | |||||||
| 238 | 26 | if ($consumer['enable_logger']) { |
|||||
| 239 | 1 | $this->injectLogger($definition); |
|||||
| 240 | } |
||||||
| 241 | |||||||
| 242 | 26 | $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); |
|||||
| 243 | 26 | $this->container->setDefinition($name, $definition); |
|||||
| 244 | 26 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||||
| 245 | } |
||||||
| 246 | 31 | } |
|||||
| 247 | |||||||
| 248 | 31 | protected function loadMultipleConsumers() |
|||||
| 249 | { |
||||||
| 250 | 31 | foreach ($this->config['multiple_consumers'] as $key => $consumer) { |
|||||
| 251 | 22 | $queues = array(); |
|||||
| 252 | 22 | $callbacks = array(); |
|||||
| 253 | |||||||
| 254 | 22 | if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { |
|||||
| 255 | throw new InvalidConfigurationException( |
||||||
| 256 | "Error on loading $key multiple consumer. " . |
||||||
| 257 | "Either 'queues' or 'queues_provider' parameters should be defined." |
||||||
| 258 | ); |
||||||
| 259 | } |
||||||
| 260 | |||||||
| 261 | 22 | foreach ($consumer['queues'] as $queueName => $queueOptions) { |
|||||
| 262 | 22 | $queues[$queueOptions['name']] = $queueOptions; |
|||||
| 263 | 22 | $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute'); |
|||||
| 264 | 22 | $callbacks[] = $queueOptions['callback']; |
|||||
| 265 | } |
||||||
| 266 | |||||||
| 267 | 22 | $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); |
|||||
| 268 | $definition |
||||||
| 269 | 22 | ->setPublic(true) |
|||||
| 270 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
| 271 | 22 | ->addTag('old_sound_rabbit_mq.multi_consumer') |
|||||
| 272 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||||
| 273 | 22 | ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues))); |
|||||
| 274 | |||||||
| 275 | 22 | if ($consumer['queues_provider']) { |
|||||
| 276 | 22 | $definition->addMethodCall( |
|||||
| 277 | 22 | 'setQueuesProvider', |
|||||
| 278 | 22 | array(new Reference($consumer['queues_provider'])) |
|||||
| 279 | ); |
||||||
| 280 | } |
||||||
| 281 | |||||||
| 282 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||||
| 283 | $definition->addMethodCall('setQosOptions', array( |
||||||
| 284 | $consumer['qos_options']['prefetch_size'], |
||||||
| 285 | $consumer['qos_options']['prefetch_count'], |
||||||
| 286 | $consumer['qos_options']['global'] |
||||||
| 287 | )); |
||||||
| 288 | } |
||||||
| 289 | |||||||
| 290 | 22 | if (isset($consumer['idle_timeout'])) { |
|||||
| 291 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
| 292 | } |
||||||
| 293 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
| 294 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
| 295 | } |
||||||
| 296 | 22 | if (isset($consumer['timeout_wait'])) { |
|||||
| 297 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||||
| 298 | } |
||||||
| 299 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||||
| 300 | $definition->addMethodCall( |
||||||
| 301 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
| 302 | array($consumer['graceful_max_execution']['timeout']) |
||||||
| 303 | ); |
||||||
| 304 | $definition->addMethodCall( |
||||||
| 305 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
| 306 | array($consumer['graceful_max_execution']['exit_code']) |
||||||
| 307 | ); |
||||||
| 308 | } |
||||||
| 309 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||||
| 310 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
| 311 | } |
||||||
| 312 | |||||||
| 313 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||||
| 314 | 22 | if ($this->collectorEnabled) { |
|||||
| 315 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||||
| 316 | } |
||||||
| 317 | |||||||
| 318 | 22 | if ($consumer['enable_logger']) { |
|||||
| 319 | $this->injectLogger($definition); |
||||||
| 320 | } |
||||||
| 321 | |||||||
| 322 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); |
|||||
| 323 | 22 | $this->container->setDefinition($name, $definition); |
|||||
| 324 | 22 | if ($consumer['queues_provider']) { |
|||||
| 325 | 22 | $this->addDequeuerAwareCall($consumer['queues_provider'], $name); |
|||||
| 326 | } |
||||||
| 327 | 22 | foreach ($callbacks as $callback) { |
|||||
| 328 | 22 | $this->addDequeuerAwareCall($callback, $name); |
|||||
| 329 | } |
||||||
| 330 | } |
||||||
| 331 | 31 | } |
|||||
| 332 | |||||||
| 333 | 31 | protected function loadDynamicConsumers() |
|||||
| 334 | { |
||||||
| 335 | 31 | foreach ($this->config['dynamic_consumers'] as $key => $consumer) { |
|||||
| 336 | |||||||
| 337 | 22 | if (empty($consumer['queue_options_provider'])) { |
|||||
| 338 | throw new InvalidConfigurationException( |
||||||
| 339 | "Error on loading $key dynamic consumer. " . |
||||||
| 340 | "'queue_provider' parameter should be defined." |
||||||
| 341 | ); |
||||||
| 342 | } |
||||||
| 343 | |||||||
| 344 | 22 | $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); |
|||||
| 345 | $definition |
||||||
| 346 | 22 | ->setPublic(true) |
|||||
| 347 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
| 348 | 22 | ->addTag('old_sound_rabbit_mq.consumer') |
|||||
| 349 | 22 | ->addTag('old_sound_rabbit_mq.dynamic_consumer') |
|||||
| 350 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||||
| 351 | 22 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||||
| 352 | |||||||
| 353 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||||
| 354 | $definition->addMethodCall('setQosOptions', array( |
||||||
| 355 | $consumer['qos_options']['prefetch_size'], |
||||||
| 356 | $consumer['qos_options']['prefetch_count'], |
||||||
| 357 | $consumer['qos_options']['global'] |
||||||
| 358 | )); |
||||||
| 359 | } |
||||||
| 360 | |||||||
| 361 | 22 | $definition->addMethodCall( |
|||||
| 362 | 22 | 'setQueueOptionsProvider', |
|||||
| 363 | 22 | array(new Reference($consumer['queue_options_provider'])) |
|||||
| 364 | ); |
||||||
| 365 | |||||||
| 366 | 22 | if (isset($consumer['idle_timeout'])) { |
|||||
| 367 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
| 368 | } |
||||||
| 369 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||||
| 370 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
| 371 | } |
||||||
| 372 | 22 | if (isset($consumer['timeout_wait'])) { |
|||||
| 373 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
||||||
| 374 | } |
||||||
| 375 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||||
| 376 | $definition->addMethodCall( |
||||||
| 377 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
| 378 | array($consumer['graceful_max_execution']['timeout']) |
||||||
| 379 | ); |
||||||
| 380 | $definition->addMethodCall( |
||||||
| 381 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||||
| 382 | array($consumer['graceful_max_execution']['exit_code']) |
||||||
| 383 | ); |
||||||
| 384 | } |
||||||
| 385 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||||
| 386 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
| 387 | } |
||||||
| 388 | |||||||
| 389 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||||
| 390 | 22 | if ($this->collectorEnabled) { |
|||||
| 391 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||||
| 392 | } |
||||||
| 393 | |||||||
| 394 | 22 | if ($consumer['enable_logger']) { |
|||||
| 395 | $this->injectLogger($definition); |
||||||
| 396 | } |
||||||
| 397 | |||||||
| 398 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key); |
|||||
| 399 | 22 | $this->container->setDefinition($name, $definition); |
|||||
| 400 | 22 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||||
| 401 | 22 | $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name); |
|||||
| 402 | } |
||||||
| 403 | 31 | } |
|||||
| 404 | |||||||
| 405 | 31 | protected function loadBatchConsumers() |
|||||
| 406 | { |
||||||
| 407 | 31 | foreach ($this->config['batch_consumers'] as $key => $consumer) { |
|||||
| 408 | $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); |
||||||
| 409 | |||||||
| 410 | if (!isset($consumer['exchange_options'])) { |
||||||
| 411 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||||
| 412 | } |
||||||
| 413 | |||||||
| 414 | $definition |
||||||
| 415 | ->setPublic(true) |
||||||
| 416 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||||
| 417 | ->addTag('old_sound_rabbit_mq.batch_consumer') |
||||||
| 418 | ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) |
||||||
| 419 | ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) |
||||||
| 420 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) |
||||||
| 421 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
||||||
| 422 | ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) |
||||||
| 423 | ->addMethodCall('setQosOptions', array( |
||||||
| 424 | $consumer['qos_options']['prefetch_size'], |
||||||
| 425 | $consumer['qos_options']['prefetch_count'], |
||||||
| 426 | $consumer['qos_options']['global'] |
||||||
| 427 | )) |
||||||
| 428 | ; |
||||||
| 429 | |||||||
| 430 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||||
| 431 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||||
| 432 | } |
||||||
| 433 | |||||||
| 434 | if (isset($consumer['idle_timeout'])) { |
||||||
| 435 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||||
| 436 | } |
||||||
| 437 | |||||||
| 438 | if (isset($consumer['graceful_max_execution'])) { |
||||||
| 439 | $definition->addMethodCall( |
||||||
| 440 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||||
| 441 | array($consumer['graceful_max_execution']['timeout']) |
||||||
| 442 | ); |
||||||
| 443 | } |
||||||
| 444 | |||||||
| 445 | if (!$consumer['auto_setup_fabric']) { |
||||||
| 446 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||||
| 447 | } |
||||||
| 448 | |||||||
| 449 | if ($consumer['keep_alive']) { |
||||||
| 450 | $definition->addMethodCall('keepAlive'); |
||||||
| 451 | } |
||||||
| 452 | |||||||
| 453 | $this->injectConnection($definition, $consumer['connection']); |
||||||
| 454 | if ($this->collectorEnabled) { |
||||||
| 455 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||||
| 456 | } |
||||||
| 457 | |||||||
| 458 | if ($consumer['enable_logger']) { |
||||||
| 459 | $this->injectLogger($definition); |
||||||
| 460 | } |
||||||
| 461 | |||||||
| 462 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); |
||||||
| 463 | } |
||||||
| 464 | 31 | } |
|||||
| 465 | |||||||
| 466 | 31 | protected function loadAnonConsumers() |
|||||
| 467 | { |
||||||
| 468 | 31 | foreach ($this->config['anon_consumers'] as $key => $anon) { |
|||||
| 469 | 22 | $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); |
|||||
| 470 | $definition |
||||||
| 471 | 22 | ->setPublic(true) |
|||||
| 472 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
| 473 | 22 | ->addTag('old_sound_rabbit_mq.anon_consumer') |
|||||
| 474 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options']))) |
|||||
| 475 | 22 | ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute'))); |
|||||
| 476 | 22 | $this->injectConnection($definition, $anon['connection']); |
|||||
| 477 | 22 | if ($this->collectorEnabled) { |
|||||
| 478 | 22 | $this->injectLoggedChannel($definition, $key, $anon['connection']); |
|||||
| 479 | } |
||||||
| 480 | |||||||
| 481 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); |
|||||
| 482 | 22 | $this->container->setDefinition($name, $definition); |
|||||
| 483 | 22 | $this->addDequeuerAwareCall($anon['callback'], $name); |
|||||
| 484 | } |
||||||
| 485 | 31 | } |
|||||
| 486 | |||||||
| 487 | /** |
||||||
| 488 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||||||
| 489 | * parameter. So we revert the change for right configurations. |
||||||
| 490 | * |
||||||
| 491 | * @param array $config |
||||||
| 492 | * |
||||||
| 493 | * @return array |
||||||
| 494 | */ |
||||||
| 495 | 27 | private function normalizeArgumentKeys(array $config) |
|||||
| 496 | { |
||||||
| 497 | 27 | if (isset($config['arguments'])) { |
|||||
| 498 | 1 | $arguments = $config['arguments']; |
|||||
| 499 | // support for old configuration |
||||||
| 500 | 1 | if (is_string($arguments)) { |
|||||
| 501 | $arguments = $this->argumentsStringAsArray($arguments); |
||||||
|
0 ignored issues
–
show
The function
OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
| 502 | } |
||||||
| 503 | |||||||
| 504 | 1 | $newArguments = array(); |
|||||
| 505 | 1 | foreach ($arguments as $key => $value) { |
|||||
| 506 | 1 | if (strstr($key, '_')) { |
|||||
| 507 | $key = str_replace('_', '-', $key); |
||||||
| 508 | } |
||||||
| 509 | 1 | $newArguments[$key] = $value; |
|||||
| 510 | } |
||||||
| 511 | 1 | $config['arguments'] = $newArguments; |
|||||
| 512 | } |
||||||
| 513 | 27 | return $config; |
|||||
| 514 | } |
||||||
| 515 | |||||||
| 516 | /** |
||||||
| 517 | * Support for arguments provided as string. Support for old configuration files. |
||||||
| 518 | * |
||||||
| 519 | * @deprecated |
||||||
| 520 | * @param string $arguments |
||||||
| 521 | * @return array |
||||||
| 522 | */ |
||||||
| 523 | private function argumentsStringAsArray($arguments) |
||||||
| 524 | { |
||||||
| 525 | $argumentsArray = array(); |
||||||
| 526 | |||||||
| 527 | $argumentPairs = explode(',', $arguments); |
||||||
| 528 | foreach ($argumentPairs as $argument) { |
||||||
| 529 | $argumentPair = explode(':', $argument); |
||||||
| 530 | $type = 'S'; |
||||||
| 531 | if (isset($argumentPair[2])) { |
||||||
| 532 | $type = $argumentPair[2]; |
||||||
| 533 | } |
||||||
| 534 | $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]); |
||||||
| 535 | } |
||||||
| 536 | |||||||
| 537 | return $argumentsArray; |
||||||
| 538 | } |
||||||
| 539 | |||||||
| 540 | 31 | protected function loadRpcClients() |
|||||
| 541 | { |
||||||
| 542 | 31 | foreach ($this->config['rpc_clients'] as $key => $client) { |
|||||
| 543 | 25 | $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); |
|||||
| 544 | 25 | $definition->setLazy($client['lazy']); |
|||||
| 545 | $definition |
||||||
| 546 | 25 | ->addTag('old_sound_rabbit_mq.rpc_client') |
|||||
| 547 | 25 | ->addMethodCall('initClient', array($client['expect_serialized_response'])); |
|||||
| 548 | 25 | $this->injectConnection($definition, $client['connection']); |
|||||
| 549 | 25 | if ($this->collectorEnabled) { |
|||||
| 550 | 25 | $this->injectLoggedChannel($definition, $key, $client['connection']); |
|||||
| 551 | } |
||||||
| 552 | 25 | if (array_key_exists('unserializer', $client)) { |
|||||
| 553 | 25 | $definition->addMethodCall('setUnserializer', array($client['unserializer'])); |
|||||
| 554 | } |
||||||
| 555 | 25 | if (array_key_exists('direct_reply_to', $client)) { |
|||||
| 556 | 25 | $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to'])); |
|||||
| 557 | } |
||||||
| 558 | 25 | $definition->setPublic(true); |
|||||
| 559 | |||||||
| 560 | 25 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition); |
|||||
| 561 | } |
||||||
| 562 | 31 | } |
|||||
| 563 | |||||||
| 564 | 31 | protected function loadRpcServers() |
|||||
| 565 | { |
||||||
| 566 | 31 | foreach ($this->config['rpc_servers'] as $key => $server) { |
|||||
| 567 | 22 | $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); |
|||||
| 568 | $definition |
||||||
| 569 | 22 | ->setPublic(true) |
|||||
| 570 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||||
| 571 | 22 | ->addTag('old_sound_rabbit_mq.rpc_server') |
|||||
| 572 | 22 | ->addMethodCall('initServer', array($key)) |
|||||
| 573 | 22 | ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute'))); |
|||||
| 574 | 22 | $this->injectConnection($definition, $server['connection']); |
|||||
| 575 | 22 | if ($this->collectorEnabled) { |
|||||
| 576 | 22 | $this->injectLoggedChannel($definition, $key, $server['connection']); |
|||||
| 577 | } |
||||||
| 578 | 22 | if (array_key_exists('qos_options', $server)) { |
|||||
| 579 | $definition->addMethodCall('setQosOptions', array( |
||||||
| 580 | $server['qos_options']['prefetch_size'], |
||||||
| 581 | $server['qos_options']['prefetch_count'], |
||||||
| 582 | $server['qos_options']['global'] |
||||||
| 583 | )); |
||||||
| 584 | } |
||||||
| 585 | 22 | if (array_key_exists('exchange_options', $server)) { |
|||||
| 586 | 22 | $definition->addMethodCall('setExchangeOptions', array($server['exchange_options'])); |
|||||
| 587 | } |
||||||
| 588 | 22 | if (array_key_exists('queue_options', $server)) { |
|||||
| 589 | 22 | $definition->addMethodCall('setQueueOptions', array($server['queue_options'])); |
|||||
| 590 | } |
||||||
| 591 | 22 | if (array_key_exists('serializer', $server)) { |
|||||
| 592 | 22 | $definition->addMethodCall('setSerializer', array($server['serializer'])); |
|||||
| 593 | } |
||||||
| 594 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition); |
|||||
| 595 | } |
||||||
| 596 | 31 | } |
|||||
| 597 | |||||||
| 598 | 28 | protected function injectLoggedChannel(Definition $definition, $name, $connectionName) |
|||||
| 599 | { |
||||||
| 600 | 28 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
|||||
| 601 | 28 | $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); |
|||||
| 602 | $channel |
||||||
| 603 | 28 | ->setPublic(false) |
|||||
| 604 | 28 | ->addTag('old_sound_rabbit_mq.logged_channel'); |
|||||
| 605 | 28 | $this->injectConnection($channel, $connectionName); |
|||||
| 606 | |||||||
| 607 | 28 | $this->container->setDefinition($id, $channel); |
|||||
| 608 | |||||||
| 609 | 28 | $this->channelIds[] = $id; |
|||||
| 610 | 28 | $definition->addArgument(new Reference($id)); |
|||||
| 611 | 28 | } |
|||||
| 612 | |||||||
| 613 | 30 | protected function injectConnection(Definition $definition, $connectionName) |
|||||
| 614 | { |
||||||
| 615 | 30 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
|||||
| 616 | 30 | } |
|||||
| 617 | |||||||
| 618 | 31 | public function getAlias() |
|||||
| 619 | { |
||||||
| 620 | 31 | return 'old_sound_rabbit_mq'; |
|||||
| 621 | } |
||||||
| 622 | |||||||
| 623 | /** |
||||||
| 624 | * Add proper dequeuer aware call |
||||||
| 625 | * |
||||||
| 626 | * @param string $callback |
||||||
| 627 | * @param string $name |
||||||
| 628 | */ |
||||||
| 629 | 26 | protected function addDequeuerAwareCall($callback, $name) |
|||||
| 630 | { |
||||||
| 631 | 26 | if (!$this->container->has($callback)) { |
|||||
| 632 | 26 | return; |
|||||
| 633 | } |
||||||
| 634 | |||||||
| 635 | $callbackDefinition = $this->container->findDefinition($callback); |
||||||
| 636 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||||||
| 637 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
||||||
| 638 | $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name))); |
||||||
| 639 | } |
||||||
| 640 | } |
||||||
| 641 | |||||||
| 642 | 1 | private function injectLogger(Definition $definition) |
|||||
| 643 | { |
||||||
| 644 | 1 | $definition->addTag('monolog.logger', array( |
|||||
| 645 | 1 | 'channel' => 'phpamqplib' |
|||||
| 646 | )); |
||||||
| 647 | 1 | $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE))); |
|||||
| 648 | 1 | } |
|||||
| 649 | |||||||
| 650 | /** |
||||||
| 651 | * Get default AMQP exchange options |
||||||
| 652 | * |
||||||
| 653 | * @return array |
||||||
| 654 | */ |
||||||
| 655 | 1 | protected function getDefaultExchangeOptions() |
|||||
| 656 | { |
||||||
| 657 | return array( |
||||||
| 658 | 1 | 'name' => '', |
|||||
| 659 | 'type' => 'direct', |
||||||
| 660 | 'passive' => true, |
||||||
| 661 | 'declare' => false |
||||||
| 662 | ); |
||||||
| 663 | } |
||||||
| 664 | |||||||
| 665 | /** |
||||||
| 666 | * Get default AMQP queue options |
||||||
| 667 | * |
||||||
| 668 | * @return array |
||||||
| 669 | */ |
||||||
| 670 | 27 | protected function getDefaultQueueOptions() |
|||||
| 671 | { |
||||||
| 672 | return array( |
||||||
| 673 | 27 | 'name' => '', |
|||||
| 674 | 'declare' => false |
||||||
| 675 | ); |
||||||
| 676 | } |
||||||
| 677 | } |
||||||
| 678 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)or! empty(...)instead.