eMAGTechLabs /
RabbitMqBundle
| 1 | <?php |
||
| 2 | |||
| 3 | namespace OldSound\RabbitMqBundle\DependencyInjection; |
||
| 4 | |||
| 5 | use OldSound\RabbitMqBundle\Consumer\ConsumersRegistry; |
||
|
0 ignored issues
–
show
|
|||
| 6 | use OldSound\RabbitMqBundle\Declarations\DeclarationsRegistry; |
||
| 7 | use OldSound\RabbitMqBundle\Declarations\QueueConsuming; |
||
| 8 | use OldSound\RabbitMqBundle\Declarations\BindingDeclaration; |
||
| 9 | use OldSound\RabbitMqBundle\Declarations\ExchangeDeclaration; |
||
| 10 | use OldSound\RabbitMqBundle\Declarations\QueueDeclaration; |
||
| 11 | use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\SimpleExecuteCallbackStrategy; |
||
| 12 | use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy; |
||
| 13 | use PhpAmqpLib\Channel\AMQPChannel; |
||
| 14 | use PhpAmqpLib\Connection\AbstractConnection; |
||
| 15 | use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
||
| 16 | use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument; |
||
| 17 | use Symfony\Component\DependencyInjection\ContainerInterface; |
||
| 18 | use Symfony\Component\DependencyInjection\Extension\Extension; |
||
| 19 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
||
| 20 | use Symfony\Component\DependencyInjection\Definition; |
||
| 21 | use Symfony\Component\DependencyInjection\Reference; |
||
| 22 | use Symfony\Component\DependencyInjection\Loader\XmlFileLoader; |
||
| 23 | use Symfony\Component\Config\FileLocator; |
||
| 24 | use Symfony\Component\HttpKernel\Log\Logger; |
||
| 25 | use Symfony\Component\Serializer\SerializerInterface; |
||
| 26 | |||
| 27 | /**+ |
||
| 28 | * OldSoundRabbitMqExtension. |
||
| 29 | * |
||
| 30 | * @author Alvaro Videla |
||
| 31 | * @author Marc Weistroff <[email protected]> |
||
| 32 | */ |
||
| 33 | class OldSoundRabbitMqExtension extends Extension |
||
| 34 | { |
||
| 35 | /** |
||
| 36 | 31 | * @var ContainerBuilder |
|
| 37 | */ |
||
| 38 | 31 | private $container; |
|
| 39 | |||
| 40 | 31 | /** |
|
| 41 | 31 | * @var Boolean Whether the data collector is enabled |
|
| 42 | */ |
||
| 43 | 31 | private $collectorEnabled; |
|
| 44 | 31 | ||
| 45 | private $channelIds = []; |
||
| 46 | 31 | private $groups = []; |
|
|
0 ignored issues
–
show
|
|||
| 47 | |||
| 48 | 31 | private $config = []; |
|
| 49 | 31 | ||
| 50 | 31 | public function load(array $configs, ContainerBuilder $container) |
|
| 51 | 31 | { |
|
| 52 | 31 | $this->container = $container; |
|
| 53 | 31 | ||
| 54 | 31 | $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); |
|
| 55 | 31 | $loader->load('rabbitmq.xml'); |
|
| 56 | 31 | ||
| 57 | 31 | $configuration = $this->getConfiguration($configs, $container); |
|
| 58 | $this->config = $this->processConfiguration($configuration, $configs); |
||
| 59 | 31 | ||
| 60 | 28 | $this->collectorEnabled = $this->config['enable_collector']; |
|
| 61 | 28 | ||
| 62 | 28 | $this->loadConnections(); |
|
| 63 | |||
| 64 | $declarationRegistryDef = new Definition(DeclarationsRegistry::class); |
||
| 65 | 28 | $declarationRegistryDef->setPublic(true); |
|
| 66 | 28 | $declarationRegistryDef->setAutowired(true); |
|
| 67 | $this->container->setDefinition('old_sound_rabbit_mq.declaration_registry', $declarationRegistryDef); |
||
| 68 | 3 | ||
| 69 | # declarations |
||
| 70 | 31 | foreach ($this->loadExchanges($this->config['declarations']['exchanges']) as $exchange) { |
|
| 71 | $declarationRegistryDef->addMethodCall('addExchange', [$exchange]); |
||
| 72 | 31 | }; |
|
| 73 | foreach ($this->loadQueues($this->config['declarations']['queues']) as $queue) { |
||
| 74 | 31 | $declarationRegistryDef->addMethodCall('addQueue', [$queue]); |
|
| 75 | }; |
||
| 76 | foreach ($this->loadBindings($this->config['declarations']['bindings']) as $binding) { |
||
| 77 | 31 | $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]); |
|
| 78 | }; |
||
| 79 | 31 | ||
| 80 | 31 | $this->loadProducers(); |
|
| 81 | $this->loadConsumers(); |
||
| 82 | 31 | ||
| 83 | 22 | if ($this->collectorEnabled && $this->channelIds) { |
|
| 84 | 31 | $channels = []; |
|
| 85 | foreach (array_unique($this->channelIds) as $id) { |
||
| 86 | 31 | $channels[] = new Reference($id); |
|
| 87 | 31 | } |
|
| 88 | |||
| 89 | 31 | $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector'); |
|
| 90 | $definition->replaceArgument(0, $channels); |
||
| 91 | } else { |
||
| 92 | $this->container->removeDefinition('old_sound_rabbit_mq.data_collector'); |
||
| 93 | 31 | } |
|
| 94 | 31 | } |
|
| 95 | 31 | ||
| 96 | public function getConfiguration(array $config, ContainerBuilder $container) |
||
| 97 | 31 | { |
|
| 98 | 31 | return new Configuration($this->getAlias()); |
|
| 99 | } |
||
| 100 | 31 | ||
| 101 | /** |
||
| 102 | * @return Definition[] |
||
| 103 | */ |
||
| 104 | protected function loadExchanges($exchanges): array |
||
| 105 | { |
||
| 106 | 31 | return array_map(function ($exchange) { |
|
| 107 | 31 | $exchangeDeclaration = new Definition(ExchangeDeclaration::class); |
|
| 108 | $exchangeDeclaration->setProperties($exchange); |
||
| 109 | 31 | ||
| 110 | foreach($this->loadBindings($exchange['bindings'], $exchange['name'], null) as $binding) { |
||
| 111 | 31 | $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]); |
|
| 112 | } |
||
| 113 | 31 | ||
| 114 | $this->container->setDefinition('old_sound_rabbit_mq.exchange.'.$exchange['name'], $exchangeDeclaration); |
||
| 115 | 31 | return $exchangeDeclaration; |
|
| 116 | }, $exchanges); |
||
| 117 | } |
||
| 118 | 31 | ||
| 119 | 22 | /** |
|
| 120 | 22 | * @return Definition[] |
|
| 121 | 22 | */ |
|
| 122 | 22 | protected function loadQueues($queues): array |
|
| 123 | 22 | { |
|
| 124 | 22 | return array_map(function ($queue, $key) use ($queues) { |
|
|
0 ignored issues
–
show
|
|||
| 125 | 22 | $queue['name'] = $queue['name'] ?? $key; |
|
| 126 | 22 | $queueDeclaration = new Definition(QueueDeclaration::class); |
|
| 127 | 22 | $queueDeclaration->setProperties($queue); |
|
| 128 | 22 | ||
| 129 | 22 | foreach ($this->loadBindings($queue['bindings'], null, $queue['name'], false) as $binding) { |
|
| 130 | 22 | $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]); |
|
| 131 | } |
||
| 132 | 22 | ||
| 133 | return $queueDeclaration; |
||
| 134 | 22 | }, $queues, array_keys($queues)); |
|
| 135 | } |
||
| 136 | 31 | ||
| 137 | protected function createBindingDef($binding, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): Definition |
||
| 138 | 31 | { |
|
| 139 | $routingKeys = $binding['routing_keys']; |
||
| 140 | 31 | if (isset($binding['routing_key'])) { |
|
| 141 | 31 | $routingKeys[] = $binding['routing_key']; |
|
| 142 | 27 | } |
|
| 143 | 27 | $definition = new Definition(BindingDeclaration::class); |
|
| 144 | 27 | $definition->setProperties([ |
|
| 145 | 27 | 'exchange' => $exchange ? $exchange : $binding['exchange'], |
|
| 146 | 'destinationIsExchange' => isset($destinationIsExchange) ? $destinationIsExchange : $binding['destination_is_exchange'], |
||
| 147 | 27 | 'destination' => $destination ? $destination : $binding['destination'], |
|
| 148 | 1 | 'routingKeys' => array_unique($routingKeys), |
|
| 149 | // TODO 'arguments' => $binding['arguments'], |
||
| 150 | 27 | //'nowait' => $binding['nowait'], |
|
| 151 | ]); |
||
| 152 | 27 | ||
| 153 | 27 | return $definition; |
|
| 154 | } |
||
| 155 | 27 | ||
| 156 | 27 | protected function loadBindings($bindings, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): array |
|
| 157 | 27 | { |
|
| 158 | 25 | $definitions = []; |
|
| 159 | foreach ($bindings as $binding) { |
||
| 160 | 27 | $definitions[] = $this->createBindingDef($binding, $exchange, $destination, $destinationIsExchange); |
|
| 161 | } |
||
| 162 | |||
| 163 | return $definitions; |
||
| 164 | 27 | } |
|
| 165 | |||
| 166 | protected function loadConnections() |
||
| 167 | { |
||
| 168 | 27 | $connFactoryDer = new Definition('%old_sound_rabbit_mq.connection_factory.class%'); |
|
| 169 | |||
| 170 | 27 | foreach ($this->config['connections'] as $key => $connection) { |
|
| 171 | 27 | $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class'; |
|
| 172 | 22 | $classParam = |
|
| 173 | $connection['lazy'] |
||
| 174 | ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%' |
||
| 175 | : '%old_sound_rabbit_mq.'.$connectionSuffix.'%'; |
||
| 176 | |||
| 177 | $definition = new Definition($classParam); |
||
| 178 | $definition->setPublic(false); |
||
| 179 | |||
| 180 | $definition->setFactory([$connFactoryDer, 'createConnection']); |
||
| 181 | 31 | $definition->setArguments([$classParam, $connection]); |
|
| 182 | |||
| 183 | 31 | $definition->addTag('old_sound_rabbit_mq.connection'); |
|
| 184 | $definition->setPublic(true); |
||
| 185 | 31 | ||
| 186 | 26 | $connectionAliase = sprintf('old_sound_rabbit_mq.connection.%s', $key); |
|
| 187 | 26 | $this->container->setDefinition($connectionAliase, $definition); |
|
| 188 | 26 | ||
| 189 | 26 | $channelDef = new Definition(AMQPChannel::class, [ |
|
| 190 | new Reference($connectionAliase) |
||
| 191 | 26 | ]); |
|
| 192 | $channelDef->setFactory([self::class, 'getChannelFromConnection']); |
||
| 193 | $channelDef->setPublic(true); |
||
| 194 | 26 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.channel.%s', $key), $channelDef); |
|
| 195 | } |
||
| 196 | 26 | } |
|
| 197 | |||
| 198 | public static function getChannelFromConnection(AbstractConnection $connection) |
||
| 199 | 26 | { |
|
| 200 | 26 | return $connection->channel(); |
|
| 201 | } |
||
| 202 | 26 | ||
| 203 | 22 | protected function loadProducers() |
|
| 204 | 22 | { |
|
| 205 | 22 | if ($this->config['sandbox']) { |
|
| 206 | 22 | foreach ($this->config['producers'] as $key => $producer) { |
|
| 207 | $definition = new Definition('%old_sound_rabbit_mq.fallback.class%'); |
||
| 208 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.producer.%s', $key), $definition); |
||
| 209 | } |
||
| 210 | 26 | return; |
|
| 211 | } |
||
| 212 | |||
| 213 | 26 | $defaultAutoDeclare = $this->container->getParameter('kernel.environment') !== 'prod'; |
|
| 214 | foreach ($this->config['producers'] as $producerName => $producer) { |
||
| 215 | $alias = sprintf('old_sound_rabbit_mq.producer.%s', $producerName); |
||
| 216 | 26 | ||
| 217 | $definition = new Definition($producer['class']); |
||
| 218 | $definition->setPublic(true); |
||
| 219 | $definition->addTag('old_sound_rabbit_mq.producer', ['producer' => $producerName]); |
||
| 220 | //this producer doesn't define an exchange -> using AMQP Default |
||
| 221 | if (!isset($producer['exchange_options'])) { |
||
| 222 | $producer['exchange_options'] = $this->getDefaultExchangeOptions(); |
||
| 223 | } |
||
| 224 | //$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options']))); |
||
| 225 | //this producer doesn't define a queue -> using AMQP Default |
||
| 226 | 26 | if (!isset($producer['queue_options'])) { |
|
| 227 | $producer['queue_options'] = $this->getDefaultQueueOptions(); |
||
| 228 | } |
||
| 229 | //$definition->addMethodCall('setQueueOptions', array($producer['queue_options'])); |
||
| 230 | 26 | ||
| 231 | 26 | $definition->addArgument($this->createChannelReference($producer['connection'])); |
|
| 232 | 25 | $definition->addArgument($producer['exchange']); |
|
| 233 | //$this->injectConnection($definition, $producer['connection']); |
||
| 234 | //if ($this->collectorEnabled) { |
||
| 235 | 26 | // $this->injectTraceableChannel($definition, $key, $producer['connection']); |
|
| 236 | 1 | //} |
|
| 237 | |||
| 238 | if (isset($producer['auto_declare'])) { |
||
| 239 | 26 | $definition->setProperty('autoDeclare', $producer['auto_declare'] ?? $defaultAutoDeclare); |
|
| 240 | 26 | } |
|
| 241 | 26 | ||
| 242 | $this->container->setDefinition($alias, $definition); |
||
| 243 | 31 | if ($producer['logging']) { |
|
| 244 | $this->injectLogger($alias); |
||
| 245 | 31 | } |
|
| 246 | } |
||
| 247 | 31 | } |
|
| 248 | 22 | ||
| 249 | 22 | private function createChannelReference($connectionName): Reference |
|
| 250 | { |
||
| 251 | 22 | return new Reference(sprintf('old_sound_rabbit_mq.channel.%s', $connectionName)); |
|
| 252 | } |
||
| 253 | |||
| 254 | protected function loadConsumers() |
||
| 255 | { |
||
| 256 | foreach ($this->config['consumers'] as $consumerName => $consumer) { |
||
| 257 | $alias = sprintf('old_sound_rabbit_mq.consumer.%s', $consumerName); |
||
| 258 | 22 | $serializerAlias = sprintf('old_sound_rabbit_mq.consumer.%s.serializer', $consumerName);// TODO |
|
|
0 ignored issues
–
show
|
|||
| 259 | 22 | ||
| 260 | 22 | $connectionName = $consumer['connection'] ?? 'default'; |
|
| 261 | 22 | ||
| 262 | $definition = new Definition('%old_sound_rabbit_mq.consumer.class%', [ |
||
| 263 | $this->createChannelReference($connectionName) |
||
| 264 | 22 | ]); |
|
| 265 | $definition->setPublic(true); |
||
| 266 | 22 | $definition->addTag('old_sound_rabbit_mq.consumer', ['consumer' => $consumerName]); |
|
| 267 | 22 | // TODO $this->container->setAlias($serializerAlias, SerializerInterface::class); |
|
| 268 | 22 | // $definition->addMethodCall('setSerializer', [new Reference($serializerAlias)]);} |
|
| 269 | 22 | foreach($consumer['consumeQueues'] as $index => $consumeQueue) { |
|
| 270 | 22 | $queueConsumingDef = new Definition(QueueConsuming::class); |
|
| 271 | $queueConsumingDef->setProperties([ |
||
| 272 | 22 | 'queueName' => $consumeQueue['queue'], |
|
| 273 | 22 | 'callback' => new Reference($consumeQueue['callback']), |
|
| 274 | 22 | //'qosPrefetchSize' => $consumeQueue['qos_prefetch_size'], |
|
| 275 | 22 | 'qosPrefetchCount' => $consumeQueue['qos_prefetch_count'], |
|
| 276 | 'batchCount' => $consumeQueue['batch_count'] ?? null, |
||
| 277 | //'consumerTag' => $consumeQueue['consumer_tag'], |
||
| 278 | //'noLocal' => $consumeQueue['no_local'], |
||
| 279 | 22 | ]); |
|
| 280 | |||
| 281 | $queueConsumingDef->addTag(sprintf('old_sound_rabbit_mq.%s.queue_consuming', $connectionName)); |
||
| 282 | $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s.queue_consuming.%s', $connectionName, $consumerName), $queueConsumingDef); |
||
| 283 | $definition->addMethodCall('consumeQueue', [$queueConsumingDef]); |
||
| 284 | } |
||
| 285 | |||
| 286 | $definition->addMethodCall('setEventDispatcher', [ |
||
| 287 | 22 | new Reference('event_dispatcher', ContainerInterface::IGNORE_ON_INVALID_REFERENCE) |
|
| 288 | ]); |
||
| 289 | |||
| 290 | 22 | /* TODO if (array_key_exists('qos_options', $consumer)) { |
|
| 291 | $definition->addMethodCall('setQosOptions', array( |
||
| 292 | $consumer['qos_options']['prefetch_size'], |
||
| 293 | 22 | $consumer['qos_options']['prefetch_count'], |
|
| 294 | $consumer['qos_options']['global'] |
||
| 295 | )); |
||
| 296 | }*/ |
||
| 297 | |||
| 298 | if (isset($consumer['idle_timeout'])) { |
||
| 299 | $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); |
||
| 300 | } |
||
| 301 | if (isset($consumer['idle_timeout_exit_code'])) { |
||
| 302 | $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); |
||
| 303 | 22 | } |
|
| 304 | if (isset($consumer['timeout_wait'])) { |
||
| 305 | $definition->setProperty('timeoutWait', [$consumer['timeout_wait']]); |
||
| 306 | } |
||
| 307 | 22 | if (isset($consumer['graceful_max_execution'])) { |
|
| 308 | 22 | $definition->addMethodCall( |
|
| 309 | 22 | 'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture', |
|
| 310 | array($consumer['graceful_max_execution']['timeout']) |
||
| 311 | ); |
||
| 312 | 22 | $definition->addMethodCall( |
|
| 313 | 'setGracefulMaxExecutionTimeoutExitCode', |
||
| 314 | array($consumer['graceful_max_execution']['exit_code']) |
||
| 315 | ); |
||
| 316 | 22 | } |
|
| 317 | 22 | ||
| 318 | 22 | $this->injectConnection($definition, $consumer['connection']); |
|
| 319 | 22 | if ($this->collectorEnabled) { |
|
| 320 | $this->injectTraceableChannel($definition, $consumerName, $consumer['connection']); |
||
| 321 | 22 | } |
|
| 322 | 22 | ||
| 323 | $this->container->setDefinition($alias, $definition); |
||
| 324 | |||
| 325 | 31 | if ($consumer['logging']) { |
|
| 326 | $this->injectLogger($alias); |
||
| 327 | 31 | } |
|
| 328 | } |
||
| 329 | 31 | } |
|
| 330 | |||
| 331 | 22 | /** |
|
| 332 | * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy |
||
| 333 | * parameter. So we revert the change for right configurations. |
||
| 334 | * |
||
| 335 | * @param array $config |
||
| 336 | * |
||
| 337 | * @return array |
||
| 338 | 22 | */ |
|
| 339 | private function normalizeArgumentKeys(array $config) |
||
|
0 ignored issues
–
show
|
|||
| 340 | 22 | { |
|
| 341 | 22 | if (isset($config['arguments'])) { |
|
| 342 | 22 | $arguments = $config['arguments']; |
|
| 343 | 22 | // support for old configuration |
|
| 344 | 22 | if (is_string($arguments)) { |
|
| 345 | 22 | $arguments = $this->argumentsStringAsArray($arguments); |
|
| 346 | } |
||
| 347 | 22 | ||
| 348 | $newArguments = []; |
||
| 349 | foreach ($arguments as $key => $value) { |
||
| 350 | if (strstr($key, '_')) { |
||
| 351 | $key = str_replace('_', '-', $key); |
||
| 352 | } |
||
| 353 | $newArguments[$key] = $value; |
||
| 354 | } |
||
| 355 | 22 | $config['arguments'] = $newArguments; |
|
| 356 | 22 | } |
|
| 357 | 22 | return $config; |
|
| 358 | } |
||
| 359 | |||
| 360 | 22 | /** |
|
| 361 | * Support for arguments provided as string. Support for old configuration files. |
||
| 362 | * |
||
| 363 | 22 | * @deprecated |
|
| 364 | * @param string $arguments |
||
| 365 | * @return array |
||
| 366 | 22 | */ |
|
| 367 | private function argumentsStringAsArray($arguments) |
||
| 368 | { |
||
| 369 | $argumentsArray = []; |
||
| 370 | |||
| 371 | $argumentPairs = explode(',', $arguments); |
||
| 372 | foreach ($argumentPairs as $argument) { |
||
| 373 | $argumentPair = explode(':', $argument); |
||
| 374 | $type = 'S'; |
||
| 375 | if (isset($argumentPair[2])) { |
||
| 376 | 22 | $type = $argumentPair[2]; |
|
| 377 | } |
||
| 378 | $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]]; |
||
| 379 | } |
||
| 380 | 22 | ||
| 381 | 22 | return $argumentsArray; |
|
| 382 | 22 | } |
|
| 383 | |||
| 384 | protected function injectTraceableChannel(Definition $definition, $name, $connectionName) |
||
| 385 | 22 | { |
|
| 386 | $id = sprintf('old_sound_rabbit_mq.channel.%s', $name); |
||
| 387 | $traceableChannel = new Definition('%old_sound_rabbit_mq.traceable.channel.class%'); |
||
| 388 | $traceableChannel |
||
| 389 | 22 | ->setPublic(false) |
|
| 390 | 22 | ->addTag('old_sound_rabbit_mq.traceable_channel'); |
|
| 391 | 22 | $this->injectConnection($traceableChannel, $connectionName); |
|
| 392 | 22 | ||
| 393 | $this->container->setDefinition($id, $traceableChannel); |
||
| 394 | 31 | ||
| 395 | $this->channelIds[] = $id; |
||
| 396 | 31 | $definition->addArgument(new Reference($id)); |
|
| 397 | } |
||
| 398 | 31 | ||
| 399 | protected function injectConnection(Definition $definition, $connectionName) |
||
| 400 | { |
||
| 401 | $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName))); |
||
| 402 | } |
||
| 403 | |||
| 404 | public function getAlias() |
||
| 405 | { |
||
| 406 | return 'old_sound_rabbit_mq'; |
||
| 407 | } |
||
| 408 | |||
| 409 | /** |
||
| 410 | * Add proper dequeuer aware call |
||
| 411 | * |
||
| 412 | * @param string $callback |
||
| 413 | * @param string $name |
||
| 414 | */ |
||
| 415 | protected function addDequeuerAwareCall($callback, $name) |
||
| 416 | { |
||
| 417 | if (!$this->container->has($callback)) { |
||
| 418 | return; |
||
| 419 | } |
||
| 420 | |||
| 421 | $callbackDefinition = $this->container->findDefinition($callback); |
||
| 422 | $refClass = new \ReflectionClass($callbackDefinition->getClass()); |
||
| 423 | if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) { |
||
| 424 | $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]); |
||
| 425 | } |
||
| 426 | } |
||
| 427 | |||
| 428 | private function injectLogger(string $definitionAlias) |
||
| 429 | { |
||
| 430 | $definition = $this->container->getDefinition($definitionAlias); |
||
| 431 | $definition->addTag('monolog.logger', [ |
||
| 432 | 'channel' => 'phpamqplib' |
||
| 433 | ]); |
||
| 434 | $loggerAlias = $definitionAlias . '.loggeer'; |
||
| 435 | $this->container->setAlias($loggerAlias, 'logger'); |
||
| 436 | $definition->addMethodCall('setLogger', [new Reference($loggerAlias, ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]); |
||
| 437 | } |
||
| 438 | |||
| 439 | /** |
||
| 440 | * Get default AMQP exchange options |
||
| 441 | * TODO use |
||
| 442 | * @return array |
||
| 443 | */ |
||
| 444 | protected function getDefaultExchangeOptions() |
||
| 445 | { |
||
| 446 | return array( |
||
| 447 | 'name' => '', |
||
| 448 | 'type' => 'direct', |
||
| 449 | 'passive' => true, |
||
| 450 | 'declare' => false |
||
| 451 | ); |
||
| 452 | } |
||
| 453 | |||
| 454 | /** |
||
| 455 | 31 | * Get default AMQP queue options |
|
| 456 | * TODO use |
||
| 457 | 31 | * @return array |
|
| 458 | */ |
||
| 459 | 31 | protected function getDefaultQueueOptions() |
|
| 460 | 22 | { |
|
| 461 | return array( |
||
| 462 | 22 | 'name' => '', |
|
| 463 | 22 | 'declare' => false |
|
| 464 | 22 | ); |
|
| 465 | 22 | } |
|
| 466 | } |
||
| 467 |
The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g.
excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths