Completed
Push — master ( 5a2db7...c7640a )
by Gaetano
06:47
created

KaliopQueueingPluginsSQSExtension   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 87
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 44.44%

Importance

Changes 0
Metric Value
wmc 14
lcom 1
cbo 7
dl 0
loc 87
ccs 20
cts 45
cp 0.4444
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A load() 0 13 1
A loadConnections() 0 8 2
C loadQueues() 0 54 11
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\DependencyInjection;
4
5
use Symfony\Component\DependencyInjection\ContainerBuilder;
6
use Symfony\Component\Config\FileLocator;
7
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
8
use Symfony\Component\DependencyInjection\Loader;
9
use Symfony\Component\DependencyInjection\Definition;
10
use Symfony\Component\DependencyInjection\Reference;
11
12
/**
13
 * This is the class that loads and manages your bundle configuration
14
 * To learn more see {@link http://symfony.com/doc/current/cookbook/bundles/extension.html}
15
 *
16
 * Logic heavily inspired by the way that Oldsound/RabbitMqBundle does things
17
 */
18
class KaliopQueueingPluginsSQSExtension extends Extension
19
{
20
    protected $config = array();
21
    protected $container;
22
    protected $queueManagerService = 'kaliop_queueing.sqs.queue_manager';
23
24
    /**
25
     * {@inheritDoc}
26
     */
27 1
    public function load(array $configs, ContainerBuilder $container)
28
    {
29 1
        $this->container = $container;
30
31 1
        $loader = new Loader\YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
32 1
        $loader->load('services.yml');
33
34 1
        $configuration = new Configuration();
35 1
        $this->config = $this->processConfiguration($configuration, $configs);
36
37 1
        $this->loadConnections();
38 1
        $this->loadQueues();
39 1
    }
40
41 1
    protected function loadConnections()
42
    {
43
        // this is not so much a loading as a 'store definition for later access', really
44 1
        $definition = $this->container->findDefinition('kaliop_queueing.driver.sqs');
45 1
        foreach ($this->config['connections'] as $key => $def) {
46 1
            $definition->addMethodCall('registerConnection', array($key, $def));
47
        }
48 1
    }
49
50 1
    protected function loadQueues()
51
    {
52 1
        $qmDefinition = null;
53 1
        if ($this->container->hasDefinition($this->queueManagerService)) {
54 1
            $qmDefinition = $this->container->findDefinition($this->queueManagerService);
55
        }
56
57 1
        foreach ($this->config['queues'] as $key => $consumer) {
58
            if (!isset($this->config['connections'][$consumer['connection']])) {
59
                throw new \RuntimeException("SQS queue '$key' can not use connection '{$consumer['connection']}' because it is not defined in the connections section");
60
            }
61
62
            $pDefinition = new Definition('%kaliop_queueing.sqs.producer.class%', array($this->config['connections'][$consumer['connection']]));
63
            $pDefinition
64
                ->addMethodCall('setQueueUrl', array($consumer['queue_options']['name']))
65
            ;
66
            if ($consumer['queue_options']['message_group_id'] != null) {
67
                $pDefinition->addMethodCall('setMessageGroupId', array($consumer['queue_options']['message_group_id']));
68
            }
69
            $name = sprintf('kaliop_queueing.sqs.%s_producer', $key);
70
            $this->container->setDefinition($name, $pDefinition);
71
72
            $cDefinition = new Definition('%kaliop_queueing.sqs.consumer.class%', array($this->config['connections'][$consumer['connection']]));
73
            $cDefinition
74
                ->addMethodCall('setQueueUrl', array($consumer['queue_options']['name']))
75
                ->addMethodCall('setCallback', array(new Reference($consumer['callback'])));
76
            ;
77
            if (count($consumer['queue_options']['routing_keys'])) {
78
                $cDefinition->addMethodCall('setRoutingKey', array(reset($consumer['queue_options']['routing_keys'])));
79
            }
80
            if ($consumer['queue_options']['max_messages_per_request'] > 1) {
81
                $cDefinition->addMethodCall('setRequestBatchSize', array($consumer['queue_options']['max_messages_per_request']));
82
            }
83
            if ($consumer['queue_options']['request_timeout'] > 0) {
84
                $cDefinition->addMethodCall('setRequestTimeout', array($consumer['queue_options']['request_timeout']));
85
            }
86
            if ($consumer['queue_options']['gc_probability'] != 1) {
87
                $cDefinition->addMethodCall('setGCProbability', array($consumer['queue_options']['gc_probability']));
88
            }
89
            if ($consumer['queue_options']['polling_interval'] != 200000) {
90
                $cDefinition->addMethodCall('setPollingInterval', array($consumer['queue_options']['polling_interval']));
91
            }
92
            $name = sprintf('kaliop_queueing.sqs.%s_consumer', $key);
93
            $this->container->setDefinition($name, $cDefinition);
94
95
            //if (!$consumer['auto_setup_fabric']) {
96
            //    $definition->addMethodCall('disableAutoSetupFabric');
97
            //}
98
99
            if ($qmDefinition) {
100
                $qmDefinition->addMethodCall('registerQueue', array($key));
101
            }
102
        }
103 1
    }
104
}
105