Code Duplication    Length = 86-86 lines in 2 locations

DependencyInjection/Compiler/RpcServerPass.php 1 location

@@ 12-97 (lines=86) @@
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
12
class RpcServerPass implements CompilerPassInterface
13
{
14
    private $queueName;
15
    private $connectionName;
16
    private $serviceName;
17
    private $basicQos;
18
    private $durable;
19
    private $autoDelete;
20
    private $arguments;
21
    private $jobs;
22
23
    public function __construct(
24
        $queueName,
25
        $connectionName,
26
        $serviceName,
27
        $basicQos,
28
        $durable,
29
        $autoDelete,
30
        array $arguments,
31
        $jobs = 1
32
    )
33
    {
34
        $this->queueName = $queueName;
35
        $this->connectionName = $connectionName;
36
        $this->serviceName = $serviceName;
37
        $this->basicQos = $basicQos;
38
        $this->durable = $durable;
39
        $this->autoDelete = $autoDelete;
40
        $this->arguments = $arguments;
41
        $this->jobs = $jobs;
42
    }
43
44
    /**
45
     * @param ContainerBuilder $container
46
     * @throws \Exception
47
     */
48
    public function process(ContainerBuilder $container)
49
    {
50
        for ($job = 0; $job < $this->jobs; $job++) {
51
            $definition = $this->buildDefinition($container);
52
            $definition->addTag('cmobi.rpc_server');
53
            $jobName = sprintf('cmobi_rabbitmq.rpc_server.%s_%s', $this->queueName, $job);
54
            $container->setDefinition($jobName, $definition);
55
        }
56
    }
57
58
    /**
59
     * @param ContainerBuilder $container
60
     * @return Definition
61
     * @throws \Exception
62
     */
63
    protected function buildDefinition(ContainerBuilder $container)
64
    {
65
        $connection = $container->getDefinition('cmobi_rabbitmq.connection.manager');
66
        $logger = $container->getDefinition('cmobi_rabbitmq.logger');
67
        $serviceDefinition = $container->getDefinition($this->serviceName);
68
        $queueBagDefinition = new Definition(
69
            RpcQueueBag::class,
70
            [
71
                'queueName' => $this->queueName,
72
                'basicQos' => $this->basicQos,
73
                'durable' => $this->durable,
74
                'autoDelete' => $this->autoDelete,
75
                'arguments' => $this->arguments
76
            ]
77
        );
78
        $queueCallbackDefinition = new Definition(
79
            RpcQueueCallback::class,
80
            [
81
                'queueService' => $serviceDefinition
82
            ]
83
        );
84
        $definition = new Definition(
85
            Queue::class,
86
            [
87
                'connectionManager' => $connection,
88
                'queueBag' => $queueBagDefinition,
89
                'logger' => $logger,
90
                'connectionName' => $this->connectionName,
91
                'callback' => $queueCallbackDefinition
92
            ]
93
        );
94
95
        return $definition;
96
    }
97
}

DependencyInjection/Compiler/SubscriberPass.php 1 location

@@ 13-98 (lines=86) @@
10
use Symfony\Component\DependencyInjection\ContainerBuilder;
11
use Symfony\Component\DependencyInjection\Definition;
12
13
class SubscriberPass implements CompilerPassInterface
14
{
15
    private $exchangeName;
16
    private $exchangeType;
17
    private $queueName;
18
    private $connectionName;
19
    private $serviceName;
20
    private $basicQos;
21
    private $arguments;
22
    private $jobs;
23
24
    public function __construct(
25
        $exchangeName,
26
        $type = ExchangeType::FANOUT,
27
        $queueName = null,
28
        $connectionName,
29
        $serviceName,
30
        $basicQos,
31
        array $arguments,
32
        $jobs = 1
33
    )
34
    {
35
        $this->exchangeName = $exchangeName;
36
        $this->exchangeType = $type;
37
        $this->queueName = $queueName;
38
        $this->connectionName = $connectionName;
39
        $this->serviceName = $serviceName;
40
        $this->basicQos = $basicQos;
41
        $this->arguments = $arguments;
42
        $this->jobs = $jobs;
43
    }
44
45
    /**
46
     * @param ContainerBuilder $container
47
     * @throws \Exception
48
     */
49
    public function process(ContainerBuilder $container)
50
    {
51
        for ($job = 0; $job < $this->jobs; $job++) {
52
            $definition = $this->buildDefinition($container);
53
            $definition->addTag('cmobi.subscriber');
54
            $jobName = sprintf('cmobi_rabbitmq.subscriber.%s_%s', $this->queueName, $job);
55
            $container->setDefinition($jobName, $definition);
56
        }
57
    }
58
59
    /**
60
     * @param ContainerBuilder $container
61
     * @return Definition
62
     * @throws \Exception
63
     */
64
    protected function buildDefinition(ContainerBuilder $container)
65
    {
66
        $connection = $container->getDefinition('cmobi_rabbitmq.connection.manager');
67
        $logger = $container->getDefinition('cmobi_rabbitmq.logger');
68
        $serviceDefinition = $container->getDefinition($this->serviceName);
69
        $queueBagDefinition = new Definition(
70
            SubscriberQueueBag::class,
71
            [
72
                'exchangeName' => $this->exchangeName,
73
                'type' => $this->exchangeType,
74
                'queueName' => $this->queueName,
75
                'basicQos' => $this->basicQos,
76
                'arguments' => $this->arguments
77
            ]
78
        );
79
        $queueCallbackDefinition = new Definition(
80
            SubscriberQueueCallback::class,
81
            [
82
                'queueService' => $serviceDefinition
83
            ]
84
        );
85
        $definition = new Definition(
86
            Queue::class,
87
            [
88
                'connectionManager' => $connection,
89
                'queueBag' => $queueBagDefinition,
90
                'logger' => $logger,
91
                'connectionName' => $this->connectionName,
92
                'callback' => $queueCallbackDefinition
93
            ]
94
        );
95
96
        return $definition;
97
    }
98
}