eMAGTechLabs /
RabbitMqBundle
| 1 | <?php |
||||
| 2 | |||||
| 3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||||
| 4 | |||||
| 5 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||||
| 6 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||||
| 7 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||||
| 8 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||||
| 9 | use Symfony\Component\DependencyInjection\Definition; |
||||
| 10 | use Symfony\Component\DependencyInjection\Reference; |
||||
| 11 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||||
| 12 | use Symfony\Component\Config\FileLocator; |
||||
| 13 | |||||
| 14 | /** |
||||
| 15 | * OldSoundRabbitMqExtension. |
||||
| 16 | * |
||||
| 17 | * @author Alvaro Videla |
||||
| 18 | * @author Marc Weistroff <[email protected]> |
||||
| 19 | */ |
||||
| 20 | class OldSoundRabbitMqExtension extends Extension |
||||
| 21 | { |
||||
| 22 | /** |
||||
| 23 | * @var ContainerBuilder |
||||
| 24 | */ |
||||
| 25 | private $container; |
||||
| 26 | |||||
| 27 | /** |
||||
| 28 | * @var Boolean Whether the data collector is enabled |
||||
| 29 | */ |
||||
| 30 | private $collectorEnabled; |
||||
| 31 | |||||
| 32 | private $channelIds = array(); |
||||
| 33 | |||||
| 34 | private $config = array(); |
||||
| 35 | |||||
| 36 | 31 | public function load(array $configs, ContainerBuilder $container) |
|||
| 37 | { |
||||
| 38 | 31 | $this->container = $container; |
|||
| 39 | |||||
| 40 | 31 | $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); |
|||
| 41 | 31 | $loader->load('rabbitmq.xml'); |
|||
| 42 | |||||
| 43 | 31 | $configuration = $this->getConfiguration($configs, $container); |
|||
| 44 | 31 | $this->config = $this->processConfiguration($configuration, $configs); |
|||
| 45 | |||||
| 46 | 31 | $this->collectorEnabled = $this->config['enable_collector']; |
|||
| 47 | |||||
| 48 | 31 | $this->loadConnections(); |
|||
| 49 | 31 | $this->loadBindings(); |
|||
| 50 | 31 | $this->loadProducers(); |
|||
| 51 | 31 | $this->loadConsumers(); |
|||
| 52 | 31 | $this->loadMultipleConsumers(); |
|||
| 53 | 31 | $this->loadDynamicConsumers(); |
|||
| 54 | 31 | $this->loadBatchConsumers(); |
|||
| 55 | 31 | $this->loadAnonConsumers(); |
|||
| 56 | 31 | $this->loadRpcClients(); |
|||
| 57 | 31 | $this->loadRpcServers(); |
|||
| 58 | |||||
| 59 | 31 | if ($this->collectorEnabled && $this->channelIds) { |
|||
|
0 ignored issues
–
show
|
|||||
| 60 | 28 | $channels = array(); |
|||
| 61 | 28 | foreach (array_unique($this->channelIds) as $id) { |
|||
| 62 | 28 | $channels[] = new Reference($id); |
|||
| 63 | } |
||||
| 64 | |||||
| 65 | 28 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|||
| 66 | 28 | $definition->replaceArgument(0, $channels); |
|||
| 67 | } else { |
||||
| 68 | 3 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
|||
| 69 | } |
||||
| 70 | 31 | } |
|||
| 71 | |||||
| 72 | 31 | public function getConfiguration(array $config, ContainerBuilder $container) |
|||
| 73 | { |
||||
| 74 | 31 | return new Configuration($this->getAlias()); |
|||
| 75 | } |
||||
| 76 | |||||
| 77 | 31 | protected function loadConnections() |
|||
| 78 | { |
||||
| 79 | 31 | foreach ($this->config['connections'] as $key => $connection) { |
|||
| 80 | 31 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|||
| 81 | $classParam = |
||||
| 82 | 31 | $connection['lazy'] |
|||
| 83 | 22 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
|||
| 84 | 31 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
|||
| 85 | |||||
| 86 | 31 | $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array( |
|||
| 87 | 31 | $classParam, $connection, |
|||
| 88 | )); |
||||
| 89 | 31 | if (isset($connection['connection_parameters_provider'])) { |
|||
| 90 | $definition->addArgument(new Reference($connection['connection_parameters_provider'])); |
||||
| 91 | unset($connection['connection_parameters_provider']); |
||||
| 92 | } |
||||
| 93 | 31 | $definition->setPublic(false); |
|||
| 94 | 31 | $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key); |
|||
| 95 | 31 | $this->container->setDefinition($factoryName, $definition); |
|||
| 96 | |||||
| 97 | 31 | $definition = new Definition($classParam); |
|||
| 98 | 31 | if (method_exists($definition, 'setFactory')) { |
|||
| 99 | // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||
| 100 | 31 | $definition->setFactory(array(new Reference($factoryName), 'createConnection')); |
|||
| 101 | } else { |
||||
| 102 | // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6 |
||||
| 103 | $definition->setFactoryService($factoryName); |
||||
| 104 | $definition->setFactoryMethod('createConnection'); |
||||
| 105 | } |
||||
| 106 | 31 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|||
| 107 | 31 | $definition->setPublic(true); |
|||
| 108 | |||||
| 109 | 31 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition); |
|||
| 110 | } |
||||
| 111 | 31 | } |
|||
| 112 | |||||
| 113 | 31 | protected function loadBindings() |
|||
| 114 | { |
||||
| 115 | 31 | if ($this->config['sandbox']) { |
|||
| 116 | return; |
||||
| 117 | } |
||||
| 118 | 31 | foreach ($this->config['bindings'] as $binding) { |
|||
| 119 | 22 | ksort($binding); |
|||
| 120 | 22 | $definition = new Definition($binding['class']); |
|||
| 121 | 22 | $definition->addTag('old_sound_rabbit_mq.binding'); |
|||
| 122 | 22 | $definition->addMethodCall('setArguments', array($binding['arguments'])); |
|||
| 123 | 22 | $definition->addMethodCall('setDestination', array($binding['destination'])); |
|||
| 124 | 22 | $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange'])); |
|||
| 125 | 22 | $definition->addMethodCall('setExchange', array($binding['exchange'])); |
|||
| 126 | 22 | $definition->addMethodCall('isNowait', array($binding['nowait'])); |
|||
| 127 | 22 | $definition->addMethodCall('setRoutingKey', array($binding['routing_key'])); |
|||
| 128 | 22 | $this->injectConnection($definition, $binding['connection']); |
|||
| 129 | 22 | $key = md5(json_encode($binding)); |
|||
| 130 | 22 | if ($this->collectorEnabled) { |
|||
| 131 | // in the context of a binding, I don't thing logged channels are needed? |
||||
| 132 | 22 | $this->injectLoggedChannel($definition, $key, $binding['connection']); |
|||
| 133 | } |
||||
| 134 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition); |
|||
| 135 | } |
||||
| 136 | 31 | } |
|||
| 137 | |||||
| 138 | 31 | protected function loadProducers() |
|||
| 139 | { |
||||
| 140 | 31 | if ($this->config['sandbox'] == false) { |
|||
| 141 | 31 | foreach ($this->config['producers'] as $key => $producer) { |
|||
| 142 | 27 | $definition = new Definition($producer['class']); |
|||
| 143 | 27 | $definition->setPublic(true); |
|||
| 144 | 27 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||
| 145 | 27 | $definition->addTag('old_sound_rabbit_mq.producer'); |
|||
| 146 | //this producer doesn't define an exchange -> using AMQP Default |
||||
| 147 | 27 | if (!isset($producer['exchange_options'])) { |
|||
| 148 | 1 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
|||
| 149 | } |
||||
| 150 | 27 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options']))); |
|||
| 151 | //this producer doesn't define a queue -> using AMQP Default |
||||
| 152 | 27 | if (!isset($producer['queue_options'])) { |
|||
| 153 | 27 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
|||
| 154 | } |
||||
| 155 | 27 | $definition->addMethodCall('setQueueOptions', array($producer['queue_options'])); |
|||
| 156 | 27 | $this->injectConnection($definition, $producer['connection']); |
|||
| 157 | 27 | if ($this->collectorEnabled) { |
|||
| 158 | 25 | $this->injectLoggedChannel($definition, $key, $producer['connection']); |
|||
| 159 | } |
||||
| 160 | 27 | if (!$producer['auto_setup_fabric']) { |
|||
| 161 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
| 162 | } |
||||
| 163 | |||||
| 164 | 27 | if ($producer['enable_logger']) { |
|||
| 165 | $this->injectLogger($definition); |
||||
| 166 | } |
||||
| 167 | |||||
| 168 | 27 | $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key); |
|||
| 169 | |||||
| 170 | 27 | $this->container->setDefinition($producerServiceName, $definition); |
|||
| 171 | 27 | if (null !== $producer['service_alias']) { |
|||
| 172 | 22 | $this->container->setAlias($producer['service_alias'], $producerServiceName); |
|||
| 173 | } |
||||
| 174 | } |
||||
| 175 | } else { |
||||
| 176 | foreach ($this->config['producers'] as $key => $producer) { |
||||
| 177 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||||
| 178 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition); |
||||
| 179 | } |
||||
| 180 | } |
||||
| 181 | 31 | } |
|||
| 182 | |||||
| 183 | 31 | protected function loadConsumers() |
|||
| 184 | { |
||||
| 185 | 31 | foreach ($this->config['consumers'] as $key => $consumer) { |
|||
| 186 | 26 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%'); |
|||
| 187 | 26 | $definition->setPublic(true); |
|||
| 188 | 26 | $definition->addTag('old_sound_rabbit_mq.base_amqp'); |
|||
| 189 | 26 | $definition->addTag('old_sound_rabbit_mq.consumer'); |
|||
| 190 | //this consumer doesn't define an exchange -> using AMQP Default |
||||
| 191 | 26 | if (!isset($consumer['exchange_options'])) { |
|||
| 192 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||
| 193 | } |
||||
| 194 | 26 | $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))); |
|||
| 195 | //this consumer doesn't define a queue -> using AMQP Default |
||||
| 196 | 26 | if (!isset($consumer['queue_options'])) { |
|||
| 197 | $consumer['queue_options'] = $this->getDefaultQueueOptions(); |
||||
| 198 | } |
||||
| 199 | 26 | $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))); |
|||
| 200 | 26 | $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||
| 201 | |||||
| 202 | 26 | if (array_key_exists('qos_options', $consumer)) { |
|||
| 203 | 22 | $definition->addMethodCall('setQosOptions', array( |
|||
| 204 | 22 | $consumer['qos_options']['prefetch_size'], |
|||
| 205 | 22 | $consumer['qos_options']['prefetch_count'], |
|||
| 206 | 22 | $consumer['qos_options']['global'] |
|||
| 207 | )); |
||||
| 208 | } |
||||
| 209 | |||||
| 210 | 26 | if (isset($consumer['idle_timeout'])) { |
|||
| 211 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
| 212 | } |
||||
| 213 | 26 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
| 214 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
| 215 | } |
||||
| 216 | 26 | if (isset($consumer['timeout_wait'])) { |
|||
| 217 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||
| 218 | } |
||||
| 219 | 26 | if (isset($consumer['graceful_max_execution'])) { |
|||
| 220 | $definition->addMethodCall( |
||||
| 221 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
| 222 | array($consumer['graceful_max_execution']['timeout']) |
||||
| 223 | ); |
||||
| 224 | $definition->addMethodCall( |
||||
| 225 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
| 226 | array($consumer['graceful_max_execution']['exit_code']) |
||||
| 227 | ); |
||||
| 228 | } |
||||
| 229 | 26 | if (!$consumer['auto_setup_fabric']) { |
|||
| 230 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
| 231 | } |
||||
| 232 | |||||
| 233 | 26 | $this->injectConnection($definition, $consumer['connection']); |
|||
| 234 | 26 | if ($this->collectorEnabled) { |
|||
| 235 | 25 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||
| 236 | } |
||||
| 237 | |||||
| 238 | 26 | if ($consumer['enable_logger']) { |
|||
| 239 | 1 | $this->injectLogger($definition); |
|||
| 240 | } |
||||
| 241 | |||||
| 242 | 26 | $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key); |
|||
| 243 | 26 | $this->container->setDefinition($name, $definition); |
|||
| 244 | 26 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||
| 245 | } |
||||
| 246 | 31 | } |
|||
| 247 | |||||
| 248 | 31 | protected function loadMultipleConsumers() |
|||
| 249 | { |
||||
| 250 | 31 | foreach ($this->config['multiple_consumers'] as $key => $consumer) { |
|||
| 251 | 22 | $queues = array(); |
|||
| 252 | 22 | $callbacks = array(); |
|||
| 253 | |||||
| 254 | 22 | if (empty($consumer['queues']) && empty($consumer['queues_provider'])) { |
|||
| 255 | throw new InvalidConfigurationException( |
||||
| 256 | "Error on loading $key multiple consumer. " . |
||||
| 257 | "Either 'queues' or 'queues_provider' parameters should be defined." |
||||
| 258 | ); |
||||
| 259 | } |
||||
| 260 | |||||
| 261 | 22 | foreach ($consumer['queues'] as $queueName => $queueOptions) { |
|||
| 262 | 22 | $queues[$queueOptions['name']] = $queueOptions; |
|||
| 263 | 22 | $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute'); |
|||
| 264 | 22 | $callbacks[] = $queueOptions['callback']; |
|||
| 265 | } |
||||
| 266 | |||||
| 267 | 22 | $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%'); |
|||
| 268 | $definition |
||||
| 269 | 22 | ->setPublic(true) |
|||
| 270 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
| 271 | 22 | ->addTag('old_sound_rabbit_mq.multi_consumer') |
|||
| 272 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||
| 273 | 22 | ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues))); |
|||
| 274 | |||||
| 275 | 22 | if ($consumer['queues_provider']) { |
|||
| 276 | 22 | $definition->addMethodCall( |
|||
| 277 | 22 | 'setQueuesProvider', |
|||
| 278 | 22 | array(new Reference($consumer['queues_provider'])) |
|||
| 279 | ); |
||||
| 280 | } |
||||
| 281 | |||||
| 282 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||
| 283 | $definition->addMethodCall('setQosOptions', array( |
||||
| 284 | $consumer['qos_options']['prefetch_size'], |
||||
| 285 | $consumer['qos_options']['prefetch_count'], |
||||
| 286 | $consumer['qos_options']['global'] |
||||
| 287 | )); |
||||
| 288 | } |
||||
| 289 | |||||
| 290 | 22 | if (isset($consumer['idle_timeout'])) { |
|||
| 291 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
| 292 | } |
||||
| 293 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
| 294 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
| 295 | } |
||||
| 296 | 22 | if (isset($consumer['timeout_wait'])) { |
|||
| 297 | 22 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
|||
| 298 | } |
||||
| 299 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||
| 300 | $definition->addMethodCall( |
||||
| 301 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
| 302 | array($consumer['graceful_max_execution']['timeout']) |
||||
| 303 | ); |
||||
| 304 | $definition->addMethodCall( |
||||
| 305 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
| 306 | array($consumer['graceful_max_execution']['exit_code']) |
||||
| 307 | ); |
||||
| 308 | } |
||||
| 309 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||
| 310 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
| 311 | } |
||||
| 312 | |||||
| 313 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||
| 314 | 22 | if ($this->collectorEnabled) { |
|||
| 315 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||
| 316 | } |
||||
| 317 | |||||
| 318 | 22 | if ($consumer['enable_logger']) { |
|||
| 319 | $this->injectLogger($definition); |
||||
| 320 | } |
||||
| 321 | |||||
| 322 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key); |
|||
| 323 | 22 | $this->container->setDefinition($name, $definition); |
|||
| 324 | 22 | if ($consumer['queues_provider']) { |
|||
| 325 | 22 | $this->addDequeuerAwareCall($consumer['queues_provider'], $name); |
|||
| 326 | } |
||||
| 327 | 22 | foreach ($callbacks as $callback) { |
|||
| 328 | 22 | $this->addDequeuerAwareCall($callback, $name); |
|||
| 329 | } |
||||
| 330 | } |
||||
| 331 | 31 | } |
|||
| 332 | |||||
| 333 | 31 | protected function loadDynamicConsumers() |
|||
| 334 | { |
||||
| 335 | 31 | foreach ($this->config['dynamic_consumers'] as $key => $consumer) { |
|||
| 336 | |||||
| 337 | 22 | if (empty($consumer['queue_options_provider'])) { |
|||
| 338 | throw new InvalidConfigurationException( |
||||
| 339 | "Error on loading $key dynamic consumer. " . |
||||
| 340 | "'queue_provider' parameter should be defined." |
||||
| 341 | ); |
||||
| 342 | } |
||||
| 343 | |||||
| 344 | 22 | $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%'); |
|||
| 345 | $definition |
||||
| 346 | 22 | ->setPublic(true) |
|||
| 347 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
| 348 | 22 | ->addTag('old_sound_rabbit_mq.consumer') |
|||
| 349 | 22 | ->addTag('old_sound_rabbit_mq.dynamic_consumer') |
|||
| 350 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
|||
| 351 | 22 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute'))); |
|||
| 352 | |||||
| 353 | 22 | if (array_key_exists('qos_options', $consumer)) { |
|||
| 354 | $definition->addMethodCall('setQosOptions', array( |
||||
| 355 | $consumer['qos_options']['prefetch_size'], |
||||
| 356 | $consumer['qos_options']['prefetch_count'], |
||||
| 357 | $consumer['qos_options']['global'] |
||||
| 358 | )); |
||||
| 359 | } |
||||
| 360 | |||||
| 361 | 22 | $definition->addMethodCall( |
|||
| 362 | 22 | 'setQueueOptionsProvider', |
|||
| 363 | 22 | array(new Reference($consumer['queue_options_provider'])) |
|||
| 364 | ); |
||||
| 365 | |||||
| 366 | 22 | if (isset($consumer['idle_timeout'])) { |
|||
| 367 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
| 368 | } |
||||
| 369 | 22 | if (isset($consumer['idle_timeout_exit_code'])) { |
|||
| 370 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
| 371 | } |
||||
| 372 | 22 | if (isset($consumer['timeout_wait'])) { |
|||
| 373 | $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])); |
||||
| 374 | } |
||||
| 375 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|||
| 376 | $definition->addMethodCall( |
||||
| 377 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
| 378 | array($consumer['graceful_max_execution']['timeout']) |
||||
| 379 | ); |
||||
| 380 | $definition->addMethodCall( |
||||
| 381 | 'setGracefulMaxExecutionTimeoutExitCode', |
||||
| 382 | array($consumer['graceful_max_execution']['exit_code']) |
||||
| 383 | ); |
||||
| 384 | } |
||||
| 385 | 22 | if (!$consumer['auto_setup_fabric']) { |
|||
| 386 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
| 387 | } |
||||
| 388 | |||||
| 389 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|||
| 390 | 22 | if ($this->collectorEnabled) { |
|||
| 391 | 22 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
|||
| 392 | } |
||||
| 393 | |||||
| 394 | 22 | if ($consumer['enable_logger']) { |
|||
| 395 | $this->injectLogger($definition); |
||||
| 396 | } |
||||
| 397 | |||||
| 398 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key); |
|||
| 399 | 22 | $this->container->setDefinition($name, $definition); |
|||
| 400 | 22 | $this->addDequeuerAwareCall($consumer['callback'], $name); |
|||
| 401 | 22 | $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name); |
|||
| 402 | } |
||||
| 403 | 31 | } |
|||
| 404 | |||||
| 405 | 31 | protected function loadBatchConsumers() |
|||
| 406 | { |
||||
| 407 | 31 | foreach ($this->config['batch_consumers'] as $key => $consumer) { |
|||
| 408 | $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); |
||||
| 409 | |||||
| 410 | if (!isset($consumer['exchange_options'])) { |
||||
| 411 | $consumer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||||
| 412 | } |
||||
| 413 | |||||
| 414 | $definition |
||||
| 415 | ->setPublic(true) |
||||
| 416 | ->addTag('old_sound_rabbit_mq.base_amqp') |
||||
| 417 | ->addTag('old_sound_rabbit_mq.batch_consumer') |
||||
| 418 | ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) |
||||
| 419 | ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) |
||||
| 420 | ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) |
||||
| 421 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) |
||||
| 422 | ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) |
||||
| 423 | ->addMethodCall('setQosOptions', array( |
||||
| 424 | $consumer['qos_options']['prefetch_size'], |
||||
| 425 | $consumer['qos_options']['prefetch_count'], |
||||
| 426 | $consumer['qos_options']['global'] |
||||
| 427 | )) |
||||
| 428 | ; |
||||
| 429 | |||||
| 430 | if (isset($consumer['idle_timeout_exit_code'])) { |
||||
| 431 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||||
| 432 | } |
||||
| 433 | |||||
| 434 | if (isset($consumer['idle_timeout'])) { |
||||
| 435 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||||
| 436 | } |
||||
| 437 | |||||
| 438 | if (isset($consumer['graceful_max_execution'])) { |
||||
| 439 | $definition->addMethodCall( |
||||
| 440 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
||||
| 441 | array($consumer['graceful_max_execution']['timeout']) |
||||
| 442 | ); |
||||
| 443 | } |
||||
| 444 | |||||
| 445 | if (!$consumer['auto_setup_fabric']) { |
||||
| 446 | $definition->addMethodCall('disableAutoSetupFabric'); |
||||
| 447 | } |
||||
| 448 | |||||
| 449 | if ($consumer['keep_alive']) { |
||||
| 450 | $definition->addMethodCall('keepAlive'); |
||||
| 451 | } |
||||
| 452 | |||||
| 453 | $this->injectConnection($definition, $consumer['connection']); |
||||
| 454 | if ($this->collectorEnabled) { |
||||
| 455 | $this->injectLoggedChannel($definition, $key, $consumer['connection']); |
||||
| 456 | } |
||||
| 457 | |||||
| 458 | if ($consumer['enable_logger']) { |
||||
| 459 | $this->injectLogger($definition); |
||||
| 460 | } |
||||
| 461 | |||||
| 462 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); |
||||
| 463 | } |
||||
| 464 | 31 | } |
|||
| 465 | |||||
| 466 | 31 | protected function loadAnonConsumers() |
|||
| 467 | { |
||||
| 468 | 31 | foreach ($this->config['anon_consumers'] as $key => $anon) { |
|||
| 469 | 22 | $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%'); |
|||
| 470 | $definition |
||||
| 471 | 22 | ->setPublic(true) |
|||
| 472 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
| 473 | 22 | ->addTag('old_sound_rabbit_mq.anon_consumer') |
|||
| 474 | 22 | ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options']))) |
|||
| 475 | 22 | ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute'))); |
|||
| 476 | 22 | $this->injectConnection($definition, $anon['connection']); |
|||
| 477 | 22 | if ($this->collectorEnabled) { |
|||
| 478 | 22 | $this->injectLoggedChannel($definition, $key, $anon['connection']); |
|||
| 479 | } |
||||
| 480 | |||||
| 481 | 22 | $name = sprintf('old_sound_rabbit_mq.%s_anon', $key); |
|||
| 482 | 22 | $this->container->setDefinition($name, $definition); |
|||
| 483 | 22 | $this->addDequeuerAwareCall($anon['callback'], $name); |
|||
| 484 | } |
||||
| 485 | 31 | } |
|||
| 486 | |||||
| 487 | /** |
||||
| 488 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||||
| 489 | * parameter. So we revert the change for right configurations. |
||||
| 490 | * |
||||
| 491 | * @param array $config |
||||
| 492 | * |
||||
| 493 | * @return array |
||||
| 494 | */ |
||||
| 495 | 27 | private function normalizeArgumentKeys(array $config) |
|||
| 496 | { |
||||
| 497 | 27 | if (isset($config['arguments'])) { |
|||
| 498 | 1 | $arguments = $config['arguments']; |
|||
| 499 | // support for old configuration |
||||
| 500 | 1 | if (is_string($arguments)) { |
|||
| 501 | $arguments = $this->argumentsStringAsArray($arguments); |
||||
|
0 ignored issues
–
show
The function
OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 502 | } |
||||
| 503 | |||||
| 504 | 1 | $newArguments = array(); |
|||
| 505 | 1 | foreach ($arguments as $key => $value) { |
|||
| 506 | 1 | if (strstr($key, '_')) { |
|||
| 507 | $key = str_replace('_', '-', $key); |
||||
| 508 | } |
||||
| 509 | 1 | $newArguments[$key] = $value; |
|||
| 510 | } |
||||
| 511 | 1 | $config['arguments'] = $newArguments; |
|||
| 512 | } |
||||
| 513 | 27 | return $config; |
|||
| 514 | } |
||||
| 515 | |||||
| 516 | /** |
||||
| 517 | * Support for arguments provided as string. Support for old configuration files. |
||||
| 518 | * |
||||
| 519 | * @deprecated |
||||
| 520 | * @param string $arguments |
||||
| 521 | * @return array |
||||
| 522 | */ |
||||
| 523 | private function argumentsStringAsArray($arguments) |
||||
| 524 | { |
||||
| 525 | $argumentsArray = array(); |
||||
| 526 | |||||
| 527 | $argumentPairs = explode(',', $arguments); |
||||
| 528 | foreach ($argumentPairs as $argument) { |
||||
| 529 | $argumentPair = explode(':', $argument); |
||||
| 530 | $type = 'S'; |
||||
| 531 | if (isset($argumentPair[2])) { |
||||
| 532 | $type = $argumentPair[2]; |
||||
| 533 | } |
||||
| 534 | $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]); |
||||
| 535 | } |
||||
| 536 | |||||
| 537 | return $argumentsArray; |
||||
| 538 | } |
||||
| 539 | |||||
| 540 | 31 | protected function loadRpcClients() |
|||
| 541 | { |
||||
| 542 | 31 | foreach ($this->config['rpc_clients'] as $key => $client) { |
|||
| 543 | 25 | $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%'); |
|||
| 544 | 25 | $definition->setLazy($client['lazy']); |
|||
| 545 | $definition |
||||
| 546 | 25 | ->addTag('old_sound_rabbit_mq.rpc_client') |
|||
| 547 | 25 | ->addMethodCall('initClient', array($client['expect_serialized_response'])); |
|||
| 548 | 25 | $this->injectConnection($definition, $client['connection']); |
|||
| 549 | 25 | if ($this->collectorEnabled) { |
|||
| 550 | 25 | $this->injectLoggedChannel($definition, $key, $client['connection']); |
|||
| 551 | } |
||||
| 552 | 25 | if (array_key_exists('unserializer', $client)) { |
|||
| 553 | 25 | $definition->addMethodCall('setUnserializer', array($client['unserializer'])); |
|||
| 554 | } |
||||
| 555 | 25 | if (array_key_exists('direct_reply_to', $client)) { |
|||
| 556 | 25 | $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to'])); |
|||
| 557 | } |
||||
| 558 | 25 | $definition->setPublic(true); |
|||
| 559 | |||||
| 560 | 25 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition); |
|||
| 561 | } |
||||
| 562 | 31 | } |
|||
| 563 | |||||
| 564 | 31 | protected function loadRpcServers() |
|||
| 565 | { |
||||
| 566 | 31 | foreach ($this->config['rpc_servers'] as $key => $server) { |
|||
| 567 | 22 | $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%'); |
|||
| 568 | $definition |
||||
| 569 | 22 | ->setPublic(true) |
|||
| 570 | 22 | ->addTag('old_sound_rabbit_mq.base_amqp') |
|||
| 571 | 22 | ->addTag('old_sound_rabbit_mq.rpc_server') |
|||
| 572 | 22 | ->addMethodCall('initServer', array($key)) |
|||
| 573 | 22 | ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute'))); |
|||
| 574 | 22 | $this->injectConnection($definition, $server['connection']); |
|||
| 575 | 22 | if ($this->collectorEnabled) { |
|||
| 576 | 22 | $this->injectLoggedChannel($definition, $key, $server['connection']); |
|||
| 577 | } |
||||
| 578 | 22 | if (array_key_exists('qos_options', $server)) { |
|||
| 579 | $definition->addMethodCall('setQosOptions', array( |
||||
| 580 | $server['qos_options']['prefetch_size'], |
||||
| 581 | $server['qos_options']['prefetch_count'], |
||||
| 582 | $server['qos_options']['global'] |
||||
| 583 | )); |
||||
| 584 | } |
||||
| 585 | 22 | if (array_key_exists('exchange_options', $server)) { |
|||
| 586 | 22 | $definition->addMethodCall('setExchangeOptions', array($server['exchange_options'])); |
|||
| 587 | } |
||||
| 588 | 22 | if (array_key_exists('queue_options', $server)) { |
|||
| 589 | 22 | $definition->addMethodCall('setQueueOptions', array($server['queue_options'])); |
|||
| 590 | } |
||||
| 591 | 22 | if (array_key_exists('serializer', $server)) { |
|||
| 592 | 22 | $definition->addMethodCall('setSerializer', array($server['serializer'])); |
|||
| 593 | } |
||||
| 594 | 22 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition); |
|||
| 595 | } |
||||
| 596 | 31 | } |
|||
| 597 | |||||
| 598 | 28 | protected function injectLoggedChannel(Definition $definition, $name, $connectionName) |
|||
| 599 | { |
||||
| 600 | 28 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
|||
| 601 | 28 | $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%'); |
|||
| 602 | $channel |
||||
| 603 | 28 | ->setPublic(false) |
|||
| 604 | 28 | ->addTag('old_sound_rabbit_mq.logged_channel'); |
|||
| 605 | 28 | $this->injectConnection($channel, $connectionName); |
|||
| 606 | |||||
| 607 | 28 | $this->container->setDefinition($id, $channel); |
|||
| 608 | |||||
| 609 | 28 | $this->channelIds[] = $id; |
|||
| 610 | 28 | $definition->addArgument(new Reference($id)); |
|||
| 611 | 28 | } |
|||
| 612 | |||||
| 613 | 30 | protected function injectConnection(Definition $definition, $connectionName) |
|||
| 614 | { |
||||
| 615 | 30 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
|||
| 616 | 30 | } |
|||
| 617 | |||||
| 618 | 31 | public function getAlias() |
|||
| 619 | { |
||||
| 620 | 31 | return 'old_sound_rabbit_mq'; |
|||
| 621 | } |
||||
| 622 | |||||
| 623 | /** |
||||
| 624 | * Add proper dequeuer aware call |
||||
| 625 | * |
||||
| 626 | * @param string $callback |
||||
| 627 | * @param string $name |
||||
| 628 | */ |
||||
| 629 | 26 | protected function addDequeuerAwareCall($callback, $name) |
|||
| 630 | { |
||||
| 631 | 26 | if (!$this->container->has($callback)) { |
|||
| 632 | 26 | return; |
|||
| 633 | } |
||||
| 634 | |||||
| 635 | $callbackDefinition = $this->container->findDefinition($callback); |
||||
| 636 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||||
| 637 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
||||
| 638 | $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name))); |
||||
| 639 | } |
||||
| 640 | } |
||||
| 641 | |||||
| 642 | 1 | private function injectLogger(Definition $definition) |
|||
| 643 | { |
||||
| 644 | 1 | $definition->addTag('monolog.logger', array( |
|||
| 645 | 1 | 'channel' => 'phpamqplib' |
|||
| 646 | )); |
||||
| 647 | 1 | $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE))); |
|||
| 648 | 1 | } |
|||
| 649 | |||||
| 650 | /** |
||||
| 651 | * Get default AMQP exchange options |
||||
| 652 | * |
||||
| 653 | * @return array |
||||
| 654 | */ |
||||
| 655 | 1 | protected function getDefaultExchangeOptions() |
|||
| 656 | { |
||||
| 657 | return array( |
||||
| 658 | 1 | 'name' => '', |
|||
| 659 | 'type' => 'direct', |
||||
| 660 | 'passive' => true, |
||||
| 661 | 'declare' => false |
||||
| 662 | ); |
||||
| 663 | } |
||||
| 664 | |||||
| 665 | /** |
||||
| 666 | * Get default AMQP queue options |
||||
| 667 | * |
||||
| 668 | * @return array |
||||
| 669 | */ |
||||
| 670 | 27 | protected function getDefaultQueueOptions() |
|||
| 671 | { |
||||
| 672 | return array( |
||||
| 673 | 27 | 'name' => '', |
|||
| 674 | 'declare' => false |
||||
| 675 | ); |
||||
| 676 | } |
||||
| 677 | } |
||||
| 678 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)or! empty(...)instead.