Test Failed
Pull Request — master (#39)
by Aleksandr
05:36
created

injectTraceableChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1.0019

Importance

Changes 0
Metric Value
cc 1
eloc 9
c 0
b 0
f 0
nc 1
nop 3
dl 0
loc 13
ccs 7
cts 8
cp 0.875
crap 1.0019
rs 9.9666
1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\Consumer\ConsumersRegistry;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Consumer\ConsumersRegistry was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
6
use OldSound\RabbitMqBundle\Declarations\DeclarationsRegistry;
7
use OldSound\RabbitMqBundle\Declarations\ConsumeOptions;
8
use OldSound\RabbitMqBundle\Declarations\BindingDeclaration;
9
use OldSound\RabbitMqBundle\Declarations\ExchangeDeclaration;
10
use OldSound\RabbitMqBundle\Declarations\QueueDeclaration;
11
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\SimpleExecuteCallbackStrategy;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ExecuteCallbackStrategy was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
12
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ExecuteCallbackStrategy was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Connection\AbstractConnection;
15
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
16
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
17
use Symfony\Component\DependencyInjection\ContainerInterface;
18
use Symfony\Component\DependencyInjection\Extension\Extension;
19
use Symfony\Component\DependencyInjection\ContainerBuilder;
20
use Symfony\Component\DependencyInjection\Definition;
21
use Symfony\Component\DependencyInjection\Reference;
22
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
23
use Symfony\Component\Config\FileLocator;
24
use Symfony\Component\HttpKernel\Log\Logger;
25
use Symfony\Component\Serializer\SerializerInterface;
26
27
/**+
28
 * OldSoundRabbitMqExtension.
29
 *
30
 * @author Alvaro Videla
31
 * @author Marc Weistroff <[email protected]>
32
 */
33
class OldSoundRabbitMqExtension extends Extension
34
{
35
    /**
36 31
     * @var ContainerBuilder
37
     */
38 31
    private $container;
39
40 31
    /**
41 31
     * @var Boolean Whether the data collector is enabled
42
     */
43 31
    private $collectorEnabled;
44 31
45
    private $channelIds = [];
46 31
    private $groups = [];
0 ignored issues
show
introduced by
The private property $groups is not used, and could be removed.
Loading history...
47
48 31
    private $config = [];
49 31
50 31
    public function load(array $configs, ContainerBuilder $container)
51 31
    {
52 31
        $this->container = $container;
53 31
54 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
55 31
        $loader->load('rabbitmq.xml');
56 31
57 31
        $configuration = $this->getConfiguration($configs, $container);
58
        $this->config = $this->processConfiguration($configuration, $configs);
59 31
60 28
        $this->collectorEnabled = $this->config['enable_collector'];
61 28
62 28
        $this->loadConnections();
63
64
        $declarationRegistryDef = new Definition(DeclarationsRegistry::class);
65 28
        $declarationRegistryDef->setPublic(true);
66 28
        $declarationRegistryDef->setAutowired(true);
67
        $this->container->setDefinition('old_sound_rabbit_mq.declaration_registry', $declarationRegistryDef);
68 3
69
        # declarations
70 31
        foreach ($this->loadExchanges($this->config['declarations']['exchanges']) as $exchange) {
71
            $declarationRegistryDef->addMethodCall('addExchange', [$exchange]);
72 31
        };
73
        foreach ($this->loadQueues($this->config['declarations']['queues']) as $queue) {
74 31
            $declarationRegistryDef->addMethodCall('addQueue', [$queue]);
75
        };
76
        foreach ($this->loadBindings($this->config['declarations']['bindings']) as $binding) {
77 31
            $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
78
        };
79 31
80 31
        $this->loadProducers();
81
        $this->loadConsumers();
82 31
83 22
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
84 31
            $channels = [];
85
            foreach (array_unique($this->channelIds) as $id) {
86 31
                $channels[] = new Reference($id);
87 31
            }
88
89 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
90
            $definition->replaceArgument(0, $channels);
91
        } else {
92
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
93 31
        }
94 31
    }
95 31
96
    public function getConfiguration(array $config, ContainerBuilder $container)
97 31
    {
98 31
        return new Configuration($this->getAlias());
99
    }
100 31
101
    /**
102
     * @return Definition[]
103
     */
104
    protected function loadExchanges($exchanges): array
105
    {
106 31
        return array_map(function ($exchange) {
107 31
            $exchangeDeclaration = new Definition(ExchangeDeclaration::class);
108
            $exchangeDeclaration->setProperties($exchange);
109 31
110
            foreach($this->loadBindings($exchange['bindings'], $exchange['name'], null) as $binding) {
111 31
                $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
112
            }
113 31
114
            $this->container->setDefinition('old_sound_rabbit_mq.exchange.'.$exchange['name'], $exchangeDeclaration);
115 31
            return $exchangeDeclaration;
116
        }, $exchanges);
117
    }
118 31
119 22
    /**
120 22
     * @return Definition[]
121 22
     */
122 22
    protected function loadQueues($queues): array
123 22
    {
124 22
        return array_map(function ($queue, $key) use ($queues) {
0 ignored issues
show
Unused Code introduced by
The import $queues is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
125 22
            $queue['name'] = $queue['name'] ?? $key;
126 22
            $queueDeclaration = new Definition(QueueDeclaration::class);
127 22
            $queueDeclaration->setProperties($queue);
128 22
129 22
            foreach ($this->loadBindings($queue['bindings'], null, $queue['name'], false) as $binding) {
130 22
                $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
131
            }
132 22
133
            return $queueDeclaration;
134 22
        }, $queues, array_keys($queues));
135
    }
136 31
137
    protected function createBindingDef($binding, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): Definition
138 31
    {
139
        $routingKeys = $binding['routing_keys'];
140 31
        if (isset($binding['routing_key'])) {
141 31
            $routingKeys[] = $binding['routing_key'];
142 27
        }
143 27
        $definition = new Definition(BindingDeclaration::class);
144 27
        $definition->setProperties([
145 27
            'exchange' => $exchange ? $exchange : $binding['exchange'],
146
            'destinationIsExchange' => isset($destinationIsExchange) ? $destinationIsExchange : $binding['destination_is_exchange'],
147 27
            'destination' => $destination ? $destination : $binding['destination'],
148 1
            'routingKeys' => array_unique($routingKeys),
149
            // TODO 'arguments' => $binding['arguments'],
150 27
            //'nowait' => $binding['nowait'],
151
        ]);
152 27
153 27
        return $definition;
154
    }
155 27
156 27
    protected function loadBindings($bindings, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): array
157 27
    {
158 25
        $definitions = [];
159
        foreach ($bindings as $binding) {
160 27
            $definitions[] = $this->createBindingDef($binding, $exchange, $destination, $destinationIsExchange);
161
        }
162
163
        return $definitions;
164 27
    }
165
166
    protected function loadConnections()
167
    {
168 27
        $connFactoryDer = new Definition('%old_sound_rabbit_mq.connection_factory.class%');
169
170 27
        foreach ($this->config['connections'] as $key => $connection) {
171 27
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
172 22
            $classParam =
173
                $connection['lazy']
174
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
175
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
176
177
            $definition = new Definition($classParam);
178
            $definition->setPublic(false);
179
180
            $definition->setFactory([$connFactoryDer, 'createConnection']);
181 31
            $definition->setArguments([$classParam, $connection]);
182
183 31
            $definition->addTag('old_sound_rabbit_mq.connection');
184
            $definition->setPublic(true);
185 31
186 26
            $connectionAliase = sprintf('old_sound_rabbit_mq.connection.%s', $key);
187 26
            $this->container->setDefinition($connectionAliase, $definition);
188 26
189 26
            $channelDef = new Definition(AMQPChannel::class, [
190
                new Reference($connectionAliase)
191 26
            ]);
192
            $channelDef->setFactory([self::class, 'getChannelFromConnection']);
193
            $channelDef->setPublic(true);
194 26
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.channel.%s', $key), $channelDef);
195
        }
196 26
    }
197
198
    public static function getChannelFromConnection(AbstractConnection $connection)
199 26
    {
200 26
        return $connection->channel();
201
    }
202 26
203 22
    protected function loadProducers()
204 22
    {
205 22
        if ($this->config['sandbox']) {
206 22
            foreach ($this->config['producers'] as $key => $producer) {
207
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
208
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.producer.%s', $key), $definition);
209
            }
210 26
            return;
211
        }
212
213 26
        $defaultAutoDeclare = $this->container->getParameter('kernel.environment') !== 'prod';
214
        foreach ($this->config['producers'] as $producerName => $producer) {
215
            $alias = sprintf('old_sound_rabbit_mq.producer.%s', $producerName);
216 26
217
            $definition = new Definition($producer['class']);
218
            $definition->setPublic(true);
219
            $definition->addTag('old_sound_rabbit_mq.producer', ['producer' => $producerName]);
220
            //this producer doesn't define an exchange -> using AMQP Default
221
            if (!isset($producer['exchange_options'])) {
222
                $producer['exchange_options'] = $this->getDefaultExchangeOptions();
223
            }
224
            //$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
225
            //this producer doesn't define a queue -> using AMQP Default
226 26
            if (!isset($producer['queue_options'])) {
227
                $producer['queue_options'] = $this->getDefaultQueueOptions();
228
            }
229
            //$definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
230 26
231 26
            $definition->addArgument($this->createChannelReference($producer['connection']));
232 25
            $definition->addArgument($producer['exchange']);
233
            //$this->injectConnection($definition, $producer['connection']);
234
            //if ($this->collectorEnabled) {
235 26
            //    $this->injectTraceableChannel($definition, $key, $producer['connection']);
236 1
            //}
237
238
            if (isset($producer['auto_declare'])) {
239 26
                $definition->setProperty('autoDeclare', $producer['auto_declare'] ?? $defaultAutoDeclare);
240 26
            }
241 26
242
            $this->container->setDefinition($alias, $definition);
243 31
            if ($producer['logging']) {
244
                $this->injectLogger($alias);
245 31
            }
246
        }
247 31
    }
248 22
249 22
    private function createChannelReference($connectionName): Reference
250
    {
251 22
        return new Reference(sprintf('old_sound_rabbit_mq.channel.%s', $connectionName));
252
    }
253
254
    protected function loadConsumers()
255
    {
256
        foreach ($this->config['consumers'] as $consumerName => $consumer) {
257
            $alias = sprintf('old_sound_rabbit_mq.consumer.%s', $consumerName);
258 22
            $serializerAlias = sprintf('old_sound_rabbit_mq.consumer.%s.serializer', $consumerName);// TODO
0 ignored issues
show
Unused Code introduced by
The assignment to $serializerAlias is dead and can be removed.
Loading history...
259 22
260 22
            $connectionName = $consumer['connection'] ?? 'default';
261 22
262
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%', [
263
                $this->createChannelReference($connectionName)
264 22
            ]);
265
            $definition->setPublic(true);
266 22
            $definition->addTag('old_sound_rabbit_mq.consumer', ['consumer' => $consumerName]);
267 22
            // TODO $this->container->setAlias($serializerAlias, SerializerInterface::class);
268 22
            // $definition->addMethodCall('setSerializer', [new Reference($serializerAlias)]);}
269 22
            foreach($consumer['consumeQueues'] as $index => $consumeQueue) {
270 22
                $queueConsumingDef = new Definition(ConsumeOptions::class);
271
                $queueConsumingDef->setProperties([
272 22
                    'queueName' => $consumeQueue['queue'],
273 22
                    'receiver' => new Reference($consumeQueue['receiver']),
274 22
                    //'qosPrefetchSize' => $consumeQueue['qos_prefetch_size'],
275 22
                    'qosPrefetchCount' => $consumeQueue['qos_prefetch_count'],
276
                    'batchCount' => $consumeQueue['batch_count'] ?? null,
277
                    //'consumerTag' => $consumeQueue['consumer_tag'],
278
                    //'noLocal' => $consumeQueue['no_local'],
279 22
                ]);
280
281
                $queueConsumingDef->addTag(sprintf('old_sound_rabbit_mq.%s.queue_consuming', $connectionName));
282
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s.queue_consuming.%s', $connectionName, $consumerName), $queueConsumingDef);
283
                $definition->addMethodCall('consumeQueue', [$queueConsumingDef]);
284
            }
285
286
            $definition->addMethodCall('setEventDispatcher', [
287 22
                new Reference('event_dispatcher', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)
288
            ]);
289
290 22
            /* TODO if (array_key_exists('qos_options', $consumer)) {
291
                $definition->addMethodCall('setQosOptions', array(
292
                    $consumer['qos_options']['prefetch_size'],
293 22
                    $consumer['qos_options']['prefetch_count'],
294
                    $consumer['qos_options']['global']
295
                ));
296
            }*/
297
298
            if (isset($consumer['idle_timeout'])) {
299
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
300
            }
301
            if (isset($consumer['idle_timeout_exit_code'])) {
302
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
303 22
            }
304
            if (isset($consumer['timeout_wait'])) {
305
                $definition->setProperty('timeoutWait', [$consumer['timeout_wait']]);
306
            }
307 22
            if (isset($consumer['graceful_max_execution'])) {
308 22
                $definition->addMethodCall(
309 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
310
                    array($consumer['graceful_max_execution']['timeout'])
311
                );
312 22
                $definition->addMethodCall(
313
                    'setGracefulMaxExecutionTimeoutExitCode',
314
                    array($consumer['graceful_max_execution']['exit_code'])
315
                );
316 22
            }
317 22
318 22
            $this->injectConnection($definition, $consumer['connection']);
319 22
            if ($this->collectorEnabled) {
320
                $this->injectTraceableChannel($definition, $consumerName, $consumer['connection']);
321 22
            }
322 22
323
            $this->container->setDefinition($alias, $definition);
324
325 31
            if ($consumer['logging']) {
326
                $this->injectLogger($alias);
327 31
            }
328
        }
329 31
    }
330
331 22
    /**
332
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
333
     * parameter. So we revert the change for right configurations.
334
     *
335
     * @param array $config
336
     *
337
     * @return array
338 22
     */
339
    private function normalizeArgumentKeys(array $config)
0 ignored issues
show
Unused Code introduced by
The method normalizeArgumentKeys() is not used, and could be removed.

This check looks for private methods that have been defined, but are not used inside the class.

Loading history...
340 22
    {
341 22
        if (isset($config['arguments'])) {
342 22
            $arguments = $config['arguments'];
343 22
            // support for old configuration
344 22
            if (is_string($arguments)) {
345 22
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

345
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
346
            }
347 22
348
            $newArguments = [];
349
            foreach ($arguments as $key => $value) {
350
                if (strstr($key, '_')) {
351
                    $key = str_replace('_', '-', $key);
352
                }
353
                $newArguments[$key] = $value;
354
            }
355 22
            $config['arguments'] = $newArguments;
356 22
        }
357 22
        return $config;
358
    }
359
360 22
    /**
361
     * Support for arguments provided as string. Support for old configuration files.
362
     *
363 22
     * @deprecated
364
     * @param string $arguments
365
     * @return array
366 22
     */
367
    private function argumentsStringAsArray($arguments)
368
    {
369
        $argumentsArray = [];
370
371
        $argumentPairs = explode(',', $arguments);
372
        foreach ($argumentPairs as $argument) {
373
            $argumentPair = explode(':', $argument);
374
            $type = 'S';
375
            if (isset($argumentPair[2])) {
376 22
                $type = $argumentPair[2];
377
            }
378
            $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]];
379
        }
380 22
381 22
        return $argumentsArray;
382 22
    }
383
384
    protected function injectTraceableChannel(Definition $definition, $name, $connectionName)
385 22
    {
386
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
387
        $traceableChannel = new Definition('%old_sound_rabbit_mq.traceable.channel.class%');
388
        $traceableChannel
389 22
            ->setPublic(false)
390 22
            ->addTag('old_sound_rabbit_mq.traceable_channel');
391 22
        $this->injectConnection($traceableChannel, $connectionName);
392 22
393
        $this->container->setDefinition($id, $traceableChannel);
394 31
395
        $this->channelIds[] = $id;
396 31
        $definition->addArgument(new Reference($id));
397
    }
398 31
399
    protected function injectConnection(Definition $definition, $connectionName)
400
    {
401
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
402
    }
403
404
    public function getAlias()
405
    {
406
        return 'old_sound_rabbit_mq';
407
    }
408
409
    /**
410
     * TODO!
411
     * Add proper dequeuer aware call
412
     *
413
     * @param string $callback
414
     * @param string $name
415
     */
416
    protected function addDequeuerAwareCall($callback, $name)
417
    {
418
        if (!$this->container->has($callback)) {
419
            return;
420
        }
421
422
        $callbackDefinition = $this->container->findDefinition($callback);
423
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
424
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
425
            $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]);
426
        }
427
    }
428
429
    private function injectLogger(string $definitionAlias)
430
    {
431
        $definition = $this->container->getDefinition($definitionAlias);
432
        $definition->addTag('monolog.logger', [
433
            'channel' => 'phpamqplib'
434
        ]);
435
        $loggerAlias = $definitionAlias . '.loggeer';
436
        $this->container->setAlias($loggerAlias, 'logger');
437
        $definition->addMethodCall('setLogger', [new Reference($loggerAlias, ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]);
438
    }
439
440
    /**
441
     * Get default AMQP exchange options
442
     * TODO use
443
     * @return array
444
     */
445
    protected function getDefaultExchangeOptions()
446
    {
447
        return array(
448
            'name' => '',
449
            'type' => 'direct',
450
            'passive' => true,
451
            'declare' => false
452
        );
453
    }
454
455 31
    /**
456
     * Get default AMQP queue options
457 31
     * TODO use
458
     * @return array
459 31
     */
460 22
    protected function getDefaultQueueOptions()
461
    {
462 22
        return array(
463 22
            'name' => '',
464 22
            'declare' => false
465 22
        );
466 22
    }
467
}
468