createExchangeDefinition()   F
last analyzed

Complexity

Conditions 18
Paths 12352

Size

Total Lines 94
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 51
CRAP Score 18.0555

Importance

Changes 0
Metric Value
dl 0
loc 94
ccs 51
cts 54
cp 0.9444
rs 2
c 0
b 0
f 0
cc 18
eloc 55
nc 12352
nop 3
crap 18.0555

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace TreeHouse\QueueBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\Config\FileLocator;
7
use Symfony\Component\DependencyInjection\ContainerBuilder;
8
use Symfony\Component\DependencyInjection\ContainerInterface;
9
use Symfony\Component\DependencyInjection\Definition;
10
use Symfony\Component\DependencyInjection\DefinitionDecorator;
11
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
12
use Symfony\Component\DependencyInjection\Reference;
13
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
14
use TreeHouse\Queue\Amqp\ExchangeInterface;
15
use TreeHouse\Queue\Amqp\QueueInterface;
16
use TreeHouse\Queue\Processor\Retry\BackoffStrategy;
17
use TreeHouse\Queue\Processor\Retry\DeprioritizeStrategy;
18
use TreeHouse\Queue\Processor\Retry\RetryProcessor;
19
20
class TreeHouseQueueExtension extends Extension
21
{
22
    /**
23
     * @var string[]
24
     */
25
    private $connections = [];
26
27
    /**
28
     * @var string[]
29
     */
30
    private $exchanges = [];
31
32
    /**
33
     * @var string[]
34
     */
35
    private $queues = [];
36
37
    /**
38
     * @var string[]
39
     */
40
    private $publishers = [];
41
42
    /**
43
     * @var string[]
44
     */
45
    private $consumers = [];
46
47
    /**
48
     * @var array Map that links exchanges with a DLX counterpart
49
     */
50
    private $dlxs = [];
51
52
    /**
53
     * @inheritdoc
54
     */
55 29
    public function load(array $configs, ContainerBuilder $container)
56
    {
57 29
        $configuration = new Configuration();
58 29
        $config = $this->processConfiguration($configuration, $configs);
59
60 28
        $loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
61 28
        $loader->load('services.yml');
62
63 28
        $this->loadDriver($config, $container);
64 28
        $this->loadConnections($config, $container);
65 27
        $this->loadPublishers($config, $container);
66 27
        $this->loadConsumers($config, $container);
67 27
        $this->loadExchanges($config, $container);
68 27
        $this->loadQueues($config, $container);
69
70 27
        $this->setCreatedDefinitionsParameters($container);
71
72 27
        if (!$config['auto_flush']) {
73 1
            $container->removeDefinition('tree_house.queue.event_listener.queue');
74
        }
75 27
    }
76
77
    /**
78
     * @param array            $config
79
     * @param ContainerBuilder $container
80
     */
81 28
    private function loadDriver(array $config, ContainerBuilder $container)
82
    {
83 28
        $container->setParameter('tree_house.queue.driver', $config['driver']);
84 28
        $container->setAlias(
85 28
            'tree_house.amqp.factory',
86 28
            sprintf('tree_house.queue.driver.%s.factory', $config['driver'])
87
        );
88
89 28
        $classes = ['connection', 'channel', 'exchange', 'queue', 'publisher', 'factory'];
90 28
        foreach ($classes as $class) {
91 28
            $name = sprintf('tree_house.queue.%s.class', $class);
92 28
            if (!$container->hasParameter($name)) {
93 28
                $value = sprintf('tree_house.queue.driver.%s.%s.class', $config['driver'], $class);
94 28
                $container->setParameter($name, $container->getParameter($value));
95
            }
96
        }
97 28
    }
98
99
    /**
100
     * @param array            $config
101
     * @param ContainerBuilder $container
102
     *
103
     * @throws \LogicException
104
     */
105 28
    private function loadConnections(array $config, ContainerBuilder $container)
106
    {
107 28
        foreach ($config['connections'] as $name => $connection) {
108 28
            $this->createConnectionDefinition($name, $connection, $container);
109
        }
110
111
        // default the first connection if it wasn't explicitly set
112 28
        if (!$config['default_connection']) {
113 26
            reset($config['connections']);
114 26
            $config['default_connection'] = key($config['connections']);
115
        }
116
117
        // set a parameter and alias for the default connection
118 28
        $connectionId = sprintf('tree_house.queue.connection.%s', $config['default_connection']);
119 28
        if (!$container->hasDefinition($connectionId)) {
120 1
            throw new \LogicException(sprintf('Connection "%s" does not exist', $connectionId));
121
        }
122
123 27
        $container->setParameter('tree_house.queue.default_connection', $config['default_connection']);
124 27
        $container->setAlias('tree_house.queue.default_connection', $connectionId);
125 27
    }
126
127
    /**
128
     * @param array            $config
129
     * @param ContainerBuilder $container
130
     */
131 27
    private function loadPublishers(array $config, ContainerBuilder $container)
132
    {
133 27
        foreach ($config['publishers'] as $name => $publisher) {
134 14
            $this->createPublisherDefinition($name, $publisher, $container);
135
        }
136 27
    }
137
138
    /**
139
     * @param array            $config
140
     * @param ContainerBuilder $container
141
     */
142 27
    private function loadConsumers(array $config, ContainerBuilder $container)
143
    {
144 27
        foreach ($config['consumers'] as $name => $consumer) {
145 9
            $this->createConsumerDefinition($name, $consumer, $container);
146
        }
147 27
    }
148
149
    /**
150
     * @param array            $config
151
     * @param ContainerBuilder $container
152
     */
153 27
    private function loadExchanges(array $config, ContainerBuilder $container)
154
    {
155 27
        foreach ($config['exchanges'] as $name => $exchange) {
156 2
            $this->createExchangeDefinition($name, $exchange, $container);
157
        }
158 27
    }
159
160
    /**
161
     * @param array            $config
162
     * @param ContainerBuilder $container
163
     */
164 27
    private function loadQueues(array $config, ContainerBuilder $container)
165
    {
166 27
        foreach ($config['queues'] as $name => $queue) {
167 3
            $this->createQueueDefinition($name, $queue, $container);
168
        }
169 27
    }
170
171
    /**
172
     * @param string           $name
173
     * @param array            $config
174
     * @param ContainerBuilder $container
175
     *
176
     * @return string
177
     */
178 28
    private function createConnectionDefinition($name, array $config, ContainerBuilder $container)
179
    {
180 28
        $amqpFactory = new Reference('tree_house.amqp.factory');
181
182 28
        $definition = new Definition($container->getParameter('tree_house.queue.connection.class'));
183 28
        $definition->setFactory([$amqpFactory, 'createConnection']);
184 28
        $definition->addArgument($config['host']);
185 28
        $definition->addArgument((integer) $config['port']);
186 28
        $definition->addArgument($config['user']);
187 28
        $definition->addArgument($config['pass']);
188 28
        $definition->addArgument($config['vhost']);
189 28
        $definition->addArgument($config['params']);
190
191 28
        $connectionId = sprintf('tree_house.queue.connection.%s', $name);
192 28
        $container->setDefinition($connectionId, $definition);
193
194
        // create channel
195 28
        $definition = new Definition($container->getParameter('tree_house.queue.channel.class'));
196 28
        $definition->setFactory([$amqpFactory, 'createChannel']);
197 28
        $definition->addArgument(new Reference($connectionId));
198
199
        // TODO set qos and prefetch stuff
200
201 28
        $channelId = sprintf('tree_house.queue.channel.%s', $name);
202 28
        $container->setDefinition($channelId, $definition);
203
204 28
        $this->connections[$name] = $connectionId;
205
206 28
        return $connectionId;
207
    }
208
209
    /**
210
     * @param string           $name
211
     * @param array            $config
212
     * @param ContainerBuilder $container
213
     *
214
     * @return string
215
     */
216 16
    private function createExchangeDefinition($name, array $config, ContainerBuilder $container)
217
    {
218 16
        $amqpFactory = new Reference('tree_house.amqp.factory');
219
220 16
        $connection = $config['connection'] ?: $container->getParameter('tree_house.queue.default_connection');
221 16
        $channelId = sprintf('tree_house.queue.channel.%s', $connection);
222 16
        $channelAlias = sprintf('tree_house.queue.channel.%s', $name);
223
224
        // add alias if connection is named differently than exchange
225 16
        if ($name !== $connection) {
226 16
            $container->setAlias($channelAlias, $channelId);
227
        }
228
229 16
        $exchangeName = $config['name'] ?: $name;
230 16
        $exchangeType = $config['type'];
231 16
        $exchangeFlags = $this->getExchangeFlagsValue($config);
232 16
        $exchangeArguments = $config['arguments'];
233 16
        $autoDeclare = isset($config['auto_declare']) ? $config['auto_declare'] : true;
234
235
        // optionally create a delayed message exchange counterpart
236 16
        if (isset($config['delay']) && $config['delay']) {
237 14
            $exchangeArguments['x-delayed-type'] = $exchangeType;
238 14
            $exchangeType = ExchangeInterface::TYPE_DELAYED;
239
        }
240
241
        // create exchange
242 16
        $definition = new Definition($container->getParameter('tree_house.queue.exchange.class'));
243 16
        $definition->setFactory([$amqpFactory, 'createExchange']);
244 16
        $definition->addArgument(new Reference($channelAlias));
245 16
        $definition->addArgument($exchangeName);
246 16
        $definition->addArgument($exchangeType);
247 16
        $definition->addArgument($exchangeFlags);
248 16
        $definition->addArgument($exchangeArguments);
249
250 16
        if ($autoDeclare) {
251 15
            $definition->addMethodCall('declareExchange');
252
        }
253
254 16
        $exchangeId = sprintf('tree_house.queue.exchange.%s', $name);
255 16
        $container->setDefinition($exchangeId, $definition);
256
257 16
        $this->exchanges[$name] = [
258 16
            'id' => $exchangeId,
259 16
            'auto_declare' => $autoDeclare,
260
        ];
261
262
        // optionally create a dead letter exchange counterpart
263 16
        if (isset($config['dlx']['enabled']) && $config['dlx']['enabled']) {
264 16
            if (!isset($config['dlx']['name'])) {
265 14
                $config['dlx']['name'] = sprintf('%s.dead', $exchangeName);
266
            }
267
268 16
            if (!isset($config['dlx']['connection'])) {
269 16
                $config['dlx']['connection'] = $connection;
270
            }
271
272 16
            if (!isset($config['dlx']['auto_declare'])) {
273 16
                $config['dlx']['auto_declare'] = $autoDeclare;
274
            }
275
276 16
            $dlxName = $config['dlx']['name'];
277 16
            $dlxId = $this->createExchangeDefinition($dlxName, $config['dlx'], $container);
278
279 16
            $this->dlxs[$name] = $dlxId;
280
281
            // create queue to route this DLX to
282 16
            $queue = $config['dlx']['queue'];
283 16
            if (!isset($queue['name'])) {
284 16
                $queue['name'] = $dlxName;
285
            }
286
287 16
            if (!isset($queue['connection'])) {
288 16
                $queue['connection'] = $connection;
289
            }
290
291 16
            $hasBinding = false;
292 16
            foreach ($queue['bindings'] as $binding) {
293
                if ($binding['exchange'] === $dlxName) {
294
                    $hasBinding = true;
295
                    break;
296
                }
297
            }
298 16
            if (!$hasBinding) {
299 16
                $queue['bindings'][] = [
300 16
                    'exchange' => $dlxName,
301
                    'arguments' => [],
302
                ];
303
            }
304
305 16
            $this->createQueueDefinition($dlxName, $queue, $container);
306
        }
307
308 16
        return $exchangeId;
309
    }
310
311
    /**
312
     * @param ContainerBuilder $container
313
     * @param array            $config
314
     * @param string           $name
315
     *
316
     * @return string
317
     */
318 9
    private function createConsumerDefinition($name, array $config, ContainerBuilder $container)
319
    {
320
        // create the queue
321 9
        $queue = $config['queue'];
322
323 9
        if (!isset($queue['name'])) {
324 8
            $queue['name'] = $name;
325
        }
326
327 9
        $queueId = $this->createQueueDefinition($name, $queue, $container);
328
329
        // create the processor
330 9
        $processorId = $this->createProcessorDefinition($name, $config, $container);
331
332
        // create the consumer
333 9
        $definition = new DefinitionDecorator('tree_house.queue.consumer.prototype');
334 9
        $definition->addArgument(new Reference($queueId));
335 9
        $definition->addArgument(new Reference($processorId));
336 9
        $definition->addArgument(new Reference('event_dispatcher'));
337
338 9
        $consumerId = sprintf('tree_house.queue.consumer.%s', $name);
339 9
        $container->setDefinition($consumerId, $definition);
340
341 9
        $this->consumers[$name] = $consumerId;
342
343 9
        return $consumerId;
344
    }
345
346
    /**
347
     * @param string           $name
348
     * @param array            $config
349
     * @param ContainerBuilder $container
350
     *
351
     * @return string
352
     */
353 22
    private function createQueueDefinition($name, array $config, ContainerBuilder $container)
354
    {
355 22
        $amqpFactory = new Reference('tree_house.amqp.factory');
356
357 22
        $connection = $config['connection'] ?: $container->getParameter('tree_house.queue.default_connection');
358 22
        $queueName = $config['name'] ?: $name;
359 22
        $channelId = sprintf('tree_house.queue.channel.%s', $connection);
360 22
        $arguments = $config['arguments'];
361 22
        $autoDeclare = isset($config['auto_declare']) ? $config['auto_declare'] : true;
362
363
        // if there is an exchange with the same name, and it has a DLX configured, set this in the arguments
364 22
        if (!array_key_exists('x-dead-letter-exchange', $arguments) && $dlx = $this->getDeadLetterExchange($name, $config, $container)) {
365 6
            $arguments['x-dead-letter-exchange'] = $dlx;
366
        }
367
368
        // create queue
369 22
        $definition = new Definition($container->getParameter('tree_house.queue.queue.class'));
370 22
        $definition->setFactory([$amqpFactory, 'createQueue']);
371 22
        $definition->addArgument(new Reference($channelId));
372 22
        $definition->addArgument($queueName);
373 22
        $definition->addArgument($this->getQueueFlagsValue($config));
374 22
        $definition->addArgument($arguments);
375
376 22
        if ($autoDeclare) {
377 21
            $definition->addMethodCall('declareQueue');
378
        }
379
380 22
        if (empty($config['bindings'])) {
381
            // bind to the same exchange
382 10
            $config['bindings'][] = [
383 10
                'exchange' => $name,
384
                'routing_keys' => [],
385
                'arguments' => [],
386
            ];
387
        }
388
389 22
        foreach ($config['bindings'] as $binding) {
390
            // if nothing is set, bind without routing key
391 22
            if (empty($binding['routing_keys'])) {
392 21
                $binding['routing_keys'] = [null];
393
            }
394
395 22
            foreach ($binding['routing_keys'] as $routingKey) {
396 22
                $definition->addMethodCall('bind', [$binding['exchange'], $routingKey, $binding['arguments']]);
397
            }
398
        }
399
400 22
        $queueId = sprintf('tree_house.queue.queue.%s', $name);
401 22
        $container->setDefinition($queueId, $definition);
402
403 22
        $this->queues[$name] = [
404 22
            'id' => $queueId,
405 22
            'auto_declare' => $autoDeclare,
406
        ];
407
408 22
        return $queueId;
409
    }
410
411
    /**
412
     * @param string           $name
413
     * @param array            $config
414
     * @param ContainerBuilder $container
415
     *
416
     * @return string
417
     */
418 14
    private function createPublisherDefinition($name, array $config, ContainerBuilder $container)
419
    {
420
        // get the right channel for the exchange
421 14
        $exchange = $config['exchange'];
422 14
        $exchangeId = $this->createExchangeDefinition($name, $exchange, $container);
423
424
        // create message composer
425 14
        $composerId = $this->createMessageComposerDefinition($name, $config, $container);
426
427
        // create publisher
428 14
        $publisherId = sprintf('tree_house.queue.publisher.%s', $name);
429 14
        $publisher = new Definition($container->getParameter('tree_house.queue.publisher.class'));
430 14
        $publisher->setLazy(true);
431 14
        $publisher->addArgument(new Reference($exchangeId));
432 14
        $publisher->addArgument(new Reference($composerId));
433
434 14
        $container->setDefinition($publisherId, $publisher);
435
436 14
        $this->publishers[$name] = $publisherId;
437
438 14
        return $publisherId;
439
    }
440
441
    /**
442
     * @param string           $name
443
     * @param array            $config
444
     * @param ContainerBuilder $container
445
     *
446
     * @return string
447
     */
448 14
    private function createMessageComposerDefinition($name, array $config, ContainerBuilder $container)
449
    {
450 14
        $composerId = sprintf('tree_house.queue.composer.%s', $name);
451 14
        $composer = $config['composer'];
452
453
        // resolve service
454 14
        if (substr($composer, 0, 1) === '@') {
455 1
            $container->setAlias($composerId, ltrim($composer, '@'));
456
        } else {
457
            // resolve parameter
458 13
            if (substr($composer, 0, 1) === '%') {
459 13
                $composer = $container->getParameter(substr($composer, 1, -1));
460
            }
461
462
            // create serializer first
463 13
            $serializerId = $this->createMessageSerializerDefinition($name, $config['serializer'], $container);
464
465 13
            $composerDef = new Definition($composer);
466 13
            $composerDef->addArgument(new Reference($serializerId));
467 13
            $container->setDefinition($composerId, $composerDef);
468
        }
469
470 14
        return $composerId;
471
    }
472
473
    /**
474
     * @param string           $name
475
     * @param string           $serializerClass
476
     * @param ContainerBuilder $container
477
     *
478
     * @return string
479
     */
480 13
    private function createMessageSerializerDefinition($name, $serializerClass, ContainerBuilder $container)
481
    {
482 13
        $serializerId = sprintf('tree_house.queue.serializer.%s', $name);
483
484
        // resolve service
485 13
        if (substr($serializerClass, 0, 1) === '@') {
486 12
            $container->setAlias($serializerId, ltrim($serializerClass, '@'));
487
488 12
            return $serializerId;
489
        } else {
490 1
            $serializer = new Definition($serializerClass);
491 1
            $container->setDefinition($serializerId, $serializer);
492
493 1
            return $serializerId;
494
        }
495
    }
496
497
    /**
498
     * @param string           $name
499
     * @param array            $config
500
     * @param ContainerBuilder $container
501
     *
502
     * @return string
503
     */
504 9
    private function createProcessorDefinition($name, array $config, ContainerBuilder $container)
505
    {
506 9
        $processorId = sprintf('tree_house.queue.processor.%s', $name);
507
508 9
        if (substr($config['processor'], 0, 1) === '@') {
509 2
            $service = ltrim($config['processor'], '@');
510
        } else {
511 7
            $service = new Definition($config['processor']);
512 7
            $service->setPublic(false);
513
        }
514
515
        // decorate the process with a retry processor if needd
516 9
        $service = $this->decorateRetryProcessor($name, $config['retry'], $service, $container);
517
518 9
        if (is_string($service)) {
519 1
            $container->setAlias($processorId, $service);
520
        } else {
521 8
            $container->setDefinition($processorId, $service);
522
        }
523
524 9
        return $processorId;
525
    }
526
527
    /**
528
     * @param string            $name
529
     * @param array             $config
530
     * @param string|Definition $service
531
     * @param ContainerBuilder  $container
532
     *
533
     * @return Definition
534
     */
535 9
    private function decorateRetryProcessor($name, array $config, $service, ContainerBuilder $container)
536
    {
537
        // skip if we only use 1 attempt
538 9
        if ($config['attempts'] < 2) {
539 4
            return $service;
540
        }
541
542 5
        $publisherName = $config['publisher'] ?: $name;
543 5
        $publisherId = sprintf('tree_house.queue.publisher.%s', $publisherName);
544
545 5
        if (!$container->hasDefinition($publisherId)) {
546
            throw new InvalidConfigurationException(sprintf('There is no publisher named "%s" configured.', $publisherName));
547
        }
548
549
        // decorate the processor
550 5
        $strategy = $this->createRetryStrategyDefinition($config['strategy'], $publisherId);
551
552 5
        $retry = new Definition(RetryProcessor::class);
553 5
        $retry->addArgument(is_string($service) ? new Reference($service) : $service);
554 5
        $retry->addArgument(is_string($strategy) ? new Reference($strategy) : $strategy);
555 5
        $retry->addArgument(new Reference('logger', ContainerInterface::NULL_ON_INVALID_REFERENCE));
556 5
        $retry->addMethodCall('setMaxAttempts', [$config['attempts']]);
557
558 5
        return $retry;
559
    }
560
561
    /**
562
     * @param array  $config
563
     * @param string $publisherId
564
     *
565
     * @return Definition
566
     */
567 5
    private function createRetryStrategyDefinition(array $config, $publisherId)
568
    {
569 5
        switch ($config['type']) {
570 5
            case 'backoff':
571 3
                $strategy = new Definition(BackoffStrategy::class);
572 3
                $strategy->addArgument(new Reference($publisherId));
573
574 3
                break;
575 2
            case 'deprioritize':
576 2
                $strategy = new Definition(DeprioritizeStrategy::class);
577 2
                $strategy->addArgument(new Reference($publisherId));
578
579 2
                break;
580
            default:
581
                throw new InvalidConfigurationException(sprintf('Unsupported retry strategy: "%s"', $config['type']));
582
        }
583
584 5
        $strategy->setPublic(false);
585
586 5
        return $strategy;
587
    }
588
589
    /**
590
     * @param ContainerBuilder $container
591
     */
592 27
    private function setCreatedDefinitionsParameters(ContainerBuilder $container)
593
    {
594 27
        $container->setParameter('tree_house.queue.connections', $this->connections);
595 27
        $container->setParameter('tree_house.queue.exchanges', $this->exchanges);
596 27
        $container->setParameter('tree_house.queue.queues', $this->queues);
597 27
        $container->setParameter('tree_house.queue.publishers', $this->publishers);
598 27
        $container->setParameter('tree_house.queue.consumers', $this->consumers);
599 27
    }
600
601
    /**
602
     * @param string           $name
603
     * @param array            $config
604
     * @param ContainerBuilder $container
605
     *
606
     * @return null|string
607
     */
608 22
    private function getDeadLetterExchange($name, array $config, ContainerBuilder $container)
609
    {
610 22
        if (isset($config['dlx'])) {
611 1
            return $config['dlx'];
612
        }
613
614 22
        if (!isset($this->dlxs[$name])) {
615 22
            return null;
616
        }
617
618 5
        $dlx = $container->getDefinition($this->dlxs[$name]);
619
620 5
        return $dlx->getArgument(1);
621
    }
622
623
    /**
624
     * @param array $exchange
625
     *
626
     * @return int
627
     */
628 16
    private function getExchangeFlagsValue(array $exchange)
629
    {
630 16
        $flags = ExchangeInterface::NOPARAM;
631
632 16
        if ($exchange['durable']) {
633 16
            $flags |= ExchangeInterface::DURABLE;
634
        }
635
636 16
        if ($exchange['passive']) {
637 2
            $flags |= ExchangeInterface::PASSIVE;
638
        }
639
640 16
        return $flags;
641
    }
642
643
    /**
644
     * @param array $queue
645
     *
646
     * @return int
647
     */
648 22
    private function getQueueFlagsValue(array $queue)
649
    {
650 22
        $flags = QueueInterface::NOPARAM;
651
652 22
        if ($queue['durable']) {
653 21
            $flags |= QueueInterface::DURABLE;
654
        }
655
656 22
        if ($queue['passive']) {
657 2
            $flags |= QueueInterface::PASSIVE;
658
        }
659
660 22
        if ($queue['exclusive']) {
661 2
            $flags |= QueueInterface::EXCLUSIVE;
662
        }
663
664 22
        if ($queue['auto_delete']) {
665 2
            $flags |= QueueInterface::AUTODELETE;
666
        }
667
668 22
        return $flags;
669
    }
670
}
671