@@ 12-97 (lines=86) @@ | ||
9 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
|
10 | use Symfony\Component\DependencyInjection\Definition; |
|
11 | ||
12 | class RpcServerPass implements CompilerPassInterface |
|
13 | { |
|
14 | private $queueName; |
|
15 | private $connectionName; |
|
16 | private $serviceName; |
|
17 | private $basicQos; |
|
18 | private $durable; |
|
19 | private $autoDelete; |
|
20 | private $arguments; |
|
21 | private $jobs; |
|
22 | ||
23 | public function __construct( |
|
24 | $queueName, |
|
25 | $connectionName, |
|
26 | $serviceName, |
|
27 | $basicQos, |
|
28 | $durable, |
|
29 | $autoDelete, |
|
30 | array $arguments, |
|
31 | $jobs = 1 |
|
32 | ) |
|
33 | { |
|
34 | $this->queueName = $queueName; |
|
35 | $this->connectionName = $connectionName; |
|
36 | $this->serviceName = $serviceName; |
|
37 | $this->basicQos = $basicQos; |
|
38 | $this->durable = $durable; |
|
39 | $this->autoDelete = $autoDelete; |
|
40 | $this->arguments = $arguments; |
|
41 | $this->jobs = $jobs; |
|
42 | } |
|
43 | ||
44 | /** |
|
45 | * @param ContainerBuilder $container |
|
46 | * @throws \Exception |
|
47 | */ |
|
48 | public function process(ContainerBuilder $container) |
|
49 | { |
|
50 | for ($job = 0; $job < $this->jobs; $job++) { |
|
51 | $definition = $this->buildDefinition($container); |
|
52 | $definition->addTag('cmobi.rpc_server'); |
|
53 | $jobName = sprintf('cmobi_rabbitmq.rpc_server.%s_%s', $this->queueName, $job); |
|
54 | $container->setDefinition($jobName, $definition); |
|
55 | } |
|
56 | } |
|
57 | ||
58 | /** |
|
59 | * @param ContainerBuilder $container |
|
60 | * @return Definition |
|
61 | * @throws \Exception |
|
62 | */ |
|
63 | protected function buildDefinition(ContainerBuilder $container) |
|
64 | { |
|
65 | $connection = $container->getDefinition('cmobi_rabbitmq.connection.manager'); |
|
66 | $logger = $container->getDefinition('cmobi_rabbitmq.logger'); |
|
67 | $serviceDefinition = $container->getDefinition($this->serviceName); |
|
68 | $queueBagDefinition = new Definition( |
|
69 | RpcQueueBag::class, |
|
70 | [ |
|
71 | 'queueName' => $this->queueName, |
|
72 | 'basicQos' => $this->basicQos, |
|
73 | 'durable' => $this->durable, |
|
74 | 'autoDelete' => $this->autoDelete, |
|
75 | 'arguments' => $this->arguments |
|
76 | ] |
|
77 | ); |
|
78 | $queueCallbackDefinition = new Definition( |
|
79 | RpcQueueCallback::class, |
|
80 | [ |
|
81 | 'queueService' => $serviceDefinition |
|
82 | ] |
|
83 | ); |
|
84 | $definition = new Definition( |
|
85 | Queue::class, |
|
86 | [ |
|
87 | 'connectionManager' => $connection, |
|
88 | 'queueBag' => $queueBagDefinition, |
|
89 | 'logger' => $logger, |
|
90 | 'connectionName' => $this->connectionName, |
|
91 | 'callback' => $queueCallbackDefinition |
|
92 | ] |
|
93 | ); |
|
94 | ||
95 | return $definition; |
|
96 | } |
|
97 | } |
@@ 13-98 (lines=86) @@ | ||
10 | use Symfony\Component\DependencyInjection\ContainerBuilder; |
|
11 | use Symfony\Component\DependencyInjection\Definition; |
|
12 | ||
13 | class SubscriberPass implements CompilerPassInterface |
|
14 | { |
|
15 | private $exchangeName; |
|
16 | private $exchangeType; |
|
17 | private $queueName; |
|
18 | private $connectionName; |
|
19 | private $serviceName; |
|
20 | private $basicQos; |
|
21 | private $arguments; |
|
22 | private $jobs; |
|
23 | ||
24 | public function __construct( |
|
25 | $exchangeName, |
|
26 | $type = ExchangeType::FANOUT, |
|
27 | $queueName = null, |
|
28 | $connectionName, |
|
29 | $serviceName, |
|
30 | $basicQos, |
|
31 | array $arguments, |
|
32 | $jobs = 1 |
|
33 | ) |
|
34 | { |
|
35 | $this->exchangeName = $exchangeName; |
|
36 | $this->exchangeType = $type; |
|
37 | $this->queueName = $queueName; |
|
38 | $this->connectionName = $connectionName; |
|
39 | $this->serviceName = $serviceName; |
|
40 | $this->basicQos = $basicQos; |
|
41 | $this->arguments = $arguments; |
|
42 | $this->jobs = $jobs; |
|
43 | } |
|
44 | ||
45 | /** |
|
46 | * @param ContainerBuilder $container |
|
47 | * @throws \Exception |
|
48 | */ |
|
49 | public function process(ContainerBuilder $container) |
|
50 | { |
|
51 | for ($job = 0; $job < $this->jobs; $job++) { |
|
52 | $definition = $this->buildDefinition($container); |
|
53 | $definition->addTag('cmobi.subscriber'); |
|
54 | $jobName = sprintf('cmobi_rabbitmq.subscriber.%s_%s', $this->queueName, $job); |
|
55 | $container->setDefinition($jobName, $definition); |
|
56 | } |
|
57 | } |
|
58 | ||
59 | /** |
|
60 | * @param ContainerBuilder $container |
|
61 | * @return Definition |
|
62 | * @throws \Exception |
|
63 | */ |
|
64 | protected function buildDefinition(ContainerBuilder $container) |
|
65 | { |
|
66 | $connection = $container->getDefinition('cmobi_rabbitmq.connection.manager'); |
|
67 | $logger = $container->getDefinition('cmobi_rabbitmq.logger'); |
|
68 | $serviceDefinition = $container->getDefinition($this->serviceName); |
|
69 | $queueBagDefinition = new Definition( |
|
70 | SubscriberQueueBag::class, |
|
71 | [ |
|
72 | 'exchangeName' => $this->exchangeName, |
|
73 | 'type' => $this->exchangeType, |
|
74 | 'queueName' => $this->queueName, |
|
75 | 'basicQos' => $this->basicQos, |
|
76 | 'arguments' => $this->arguments |
|
77 | ] |
|
78 | ); |
|
79 | $queueCallbackDefinition = new Definition( |
|
80 | SubscriberQueueCallback::class, |
|
81 | [ |
|
82 | 'queueService' => $serviceDefinition |
|
83 | ] |
|
84 | ); |
|
85 | $definition = new Definition( |
|
86 | Queue::class, |
|
87 | [ |
|
88 | 'connectionManager' => $connection, |
|
89 | 'queueBag' => $queueBagDefinition, |
|
90 | 'logger' => $logger, |
|
91 | 'connectionName' => $this->connectionName, |
|
92 | 'callback' => $queueCallbackDefinition |
|
93 | ] |
|
94 | ); |
|
95 | ||
96 | return $definition; |
|
97 | } |
|
98 | } |