SubscriberPass   A
last analyzed

Complexity

Total Complexity 4

Size/Duplication

Total Lines 84
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Importance

Changes 0
Metric Value
wmc 4
lcom 1
cbo 2
dl 0
loc 84
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 20 1
A process() 0 9 2
B buildDefinition() 0 32 1
1
<?php
2
3
namespace Cmobi\RabbitmqBundle\DependencyInjection\Compiler;
4
5
use Cmobi\RabbitmqBundle\Queue\Queue;
6
use Cmobi\RabbitmqBundle\Transport\Subscriber\ExchangeType;
7
use Cmobi\RabbitmqBundle\Transport\Subscriber\SubscriberQueueBag;
8
use Cmobi\RabbitmqBundle\Transport\Subscriber\SubscriberQueueCallback;
9
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
10
use Symfony\Component\DependencyInjection\ContainerBuilder;
11
use Symfony\Component\DependencyInjection\Definition;
12
13
class SubscriberPass implements CompilerPassInterface
14
{
15
    private $exchangeName;
16
    private $exchangeType;
17
    private $queueName;
18
    private $connectionName;
19
    private $serviceName;
20
    private $basicQos;
21
    private $arguments;
22
    private $jobs;
23
24
    public function __construct(
25
        $exchangeName,
26
        $type = ExchangeType::FANOUT,
27
        $queueName = null,
28
        $connectionName,
29
        $serviceName,
30
        $basicQos,
31
        array $arguments,
32
        $jobs = 1
33
    )
34
    {
35
        $this->exchangeName = $exchangeName;
36
        $this->exchangeType = $type;
37
        $this->queueName = $queueName;
38
        $this->connectionName = $connectionName;
39
        $this->serviceName = $serviceName;
40
        $this->basicQos = $basicQos;
41
        $this->arguments = $arguments;
42
        $this->jobs = $jobs;
43
    }
44
45
    /**
46
     * @param ContainerBuilder $container
47
     * @throws \Exception
48
     */
49
    public function process(ContainerBuilder $container)
50
    {
51
        for ($job = 0; $job < $this->jobs; $job++) {
52
            $definition = $this->buildDefinition($container);
53
            $definition->addTag('cmobi.subscriber');
54
            $jobName = sprintf('cmobi_rabbitmq.subscriber.%s_%s', $this->queueName, $job);
55
            $container->setDefinition($jobName, $definition);
56
        }
57
    }
58
59
    /**
60
     * @param ContainerBuilder $container
61
     * @return Definition
62
     * @throws \Exception
63
     */
64
    protected function buildDefinition(ContainerBuilder $container)
65
    {
66
        $connection = $container->getDefinition('cmobi_rabbitmq.connection.manager');
67
        $serviceDefinition = $container->getDefinition($this->serviceName);
68
        $queueBagDefinition = new Definition(
69
            SubscriberQueueBag::class,
70
            [
71
                'exchangeName' => $this->exchangeName,
72
                'type' => $this->exchangeType,
73
                'queueName' => $this->queueName,
74
                'basicQos' => $this->basicQos,
75
                'arguments' => $this->arguments
76
            ]
77
        );
78
        $queueCallbackDefinition = new Definition(
79
            SubscriberQueueCallback::class,
80
            [
81
                'queueService' => $serviceDefinition
82
            ]
83
        );
84
        $definition = new Definition(
85
            Queue::class,
86
            [
87
                'connectionManager' => $connection,
88
                'queueBag' => $queueBagDefinition,
89
                'connectionName' => $this->connectionName,
90
                'callback' => $queueCallbackDefinition
91
            ]
92
        );
93
94
        return $definition;
95
    }
96
}