1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\DependencyInjection\Compiler; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Model\Job; |
6
|
|
|
use Dtc\QueueBundle\Model\Run; |
7
|
|
|
use Pheanstalk\Pheanstalk; |
8
|
|
|
use Symfony\Component\DependencyInjection\Alias; |
9
|
|
|
use Symfony\Component\DependencyInjection\ContainerBuilder; |
10
|
|
|
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; |
11
|
|
|
use Symfony\Component\DependencyInjection\Definition; |
12
|
|
|
use Symfony\Component\DependencyInjection\Reference; |
13
|
|
|
|
14
|
|
|
class WorkerCompilerPass implements CompilerPassInterface |
15
|
|
|
{ |
16
|
|
|
public function process(ContainerBuilder $container) |
17
|
|
|
{ |
18
|
|
|
if (false === $container->hasDefinition('dtc_queue.worker_manager')) { |
19
|
|
|
return; |
20
|
|
|
} |
21
|
|
|
|
22
|
|
|
$defaultManagerType = $container->getParameter('dtc_queue.default_manager'); |
23
|
|
|
if (!$container->hasDefinition('dtc_queue.job_manager.'.$defaultManagerType)) { |
24
|
|
|
throw new \Exception("No job manager found for dtc_queue.job_manager.$defaultManagerType"); |
25
|
|
|
} |
26
|
|
|
|
27
|
|
|
$alias = new Alias('dtc_queue.job_manager.'.$defaultManagerType); |
28
|
|
|
$container->setAlias('dtc_queue.job_manager', $alias); |
29
|
|
|
|
30
|
|
|
// Setup beanstalkd if configuration is present |
31
|
|
|
$this->setupBeanstalkd($container); |
32
|
|
|
$this->setupRabbitMQ($container); |
33
|
|
|
|
34
|
|
|
$definition = $container->getDefinition('dtc_queue.worker_manager'); |
35
|
|
|
$jobManagerRef = array(new Reference('dtc_queue.job_manager')); |
36
|
|
|
|
37
|
|
|
$jobClass = $this->getJobClass($container); |
38
|
|
|
$jobClassArchive = $this->getJobClassArchive($container); |
39
|
|
|
$container->setParameter('dtc_queue.class_job', $jobClass); |
40
|
|
|
$container->setParameter('dtc_queue.class_job_archive', $jobClassArchive); |
41
|
|
|
$container->setParameter('dtc_queue.class_run', $this->getRunClass($container)); |
42
|
|
|
$container->setParameter('dtc_queue.class_run_archive', $this->getRunArchiveClass($container)); |
43
|
|
|
// Add each worker to workerManager, make sure each worker has instance to work |
44
|
|
|
foreach ($container->findTaggedServiceIds('dtc_queue.worker') as $id => $attributes) { |
45
|
|
|
$worker = $container->getDefinition($id); |
46
|
|
|
$class = $container->getDefinition($id)->getClass(); |
47
|
|
|
|
48
|
|
|
$refClass = new \ReflectionClass($class); |
49
|
|
|
$workerClass = 'Dtc\QueueBundle\Model\Worker'; |
50
|
|
|
if (!$refClass->isSubclassOf($workerClass)) { |
51
|
|
|
throw new \InvalidArgumentException(sprintf('Service "%s" must extend class "%s".', $id, $workerClass)); |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
// Give each worker access to job manager |
55
|
|
|
$worker->addMethodCall('setJobManager', $jobManagerRef); |
56
|
|
|
$worker->addMethodCall('setJobClass', array($jobClass)); |
57
|
|
|
|
58
|
|
|
$definition->addMethodCall('addWorker', array(new Reference($id))); |
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
$eventDispatcher = $container->getDefinition('dtc_queue.event_dispatcher'); |
62
|
|
|
foreach ($container->findTaggedServiceIds('dtc_queue.event_subscriber') as $id => $attributes) { |
63
|
|
|
$eventSubscriber = $container->getDefinition($id); |
64
|
|
|
$eventDispatcher->addMethodCall('addSubscriber', [$eventSubscriber]); |
65
|
|
|
} |
66
|
|
|
$this->setupDoctrineManagers($container); |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* Sets up beanstalkd instance if appropriate. |
71
|
|
|
* |
72
|
|
|
* @param ContainerBuilder $container |
73
|
|
|
*/ |
74
|
|
|
public function setupBeanstalkd(ContainerBuilder $container) |
75
|
|
|
{ |
76
|
|
|
if ($container->hasParameter('dtc_queue.beanstalkd.host')) { |
77
|
|
|
$definition = new Definition('Pheanstalk\\Pheanstalk', [$container->getParameter('dtc_queue.beanstalkd.host')]); |
78
|
|
|
$container->setDefinition('dtc_queue.beanstalkd', $definition); |
79
|
|
|
$definition = $container->getDefinition('dtc_queue.job_manager.beanstalkd'); |
80
|
|
|
$definition->addMethodCall('setBeanstalkd', [new Reference('dtc_queue.beanstalkd')]); |
81
|
|
|
if ($container->hasParameter('dtc_queue.beanstalkd.tube')) { |
82
|
|
|
$definition->addMethodCall('setTube', [$container->getParameter('dtc_queue.beanstalkd.tube')]); |
83
|
|
|
} |
84
|
|
|
} |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
public function setupDoctrineManagers(ContainerBuilder $container) |
88
|
|
|
{ |
89
|
|
|
$documentManager = $container->getParameter('dtc_queue.document_manager'); |
90
|
|
|
|
91
|
|
|
$odmManager = "doctrine_mongodb.odm.{$documentManager}_document_manager"; |
92
|
|
|
if ($container->has($odmManager)) { |
93
|
|
|
$container->setAlias('dtc_queue.document_manager', $odmManager); |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
$entityManager = $container->getParameter('dtc_queue.entity_manager'); |
97
|
|
|
|
98
|
|
|
$ormManager = "doctrine.orm.{$entityManager}_entity_manager"; |
99
|
|
|
if ($container->has($ormManager)) { |
100
|
|
|
$container->setAlias('dtc_queue.entity_manager', $ormManager); |
101
|
|
|
} |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* Sets up RabbitMQ instance if appropriate. |
106
|
|
|
* |
107
|
|
|
* @param ContainerBuilder $container |
108
|
|
|
*/ |
109
|
|
|
public function setupRabbitMQ(ContainerBuilder $container) |
110
|
|
|
{ |
111
|
|
|
if ($container->hasParameter('dtc_queue.rabbit_mq')) { |
112
|
|
|
$class = 'PhpAmqpLib\\Connection\\AMQPStreamConnection'; |
113
|
|
|
$rabbitMqConfig = $container->getParameter('dtc_queue.rabbit_mq'); |
114
|
|
|
$arguments = [ |
115
|
|
|
$rabbitMqConfig['host'], |
116
|
|
|
$rabbitMqConfig['port'], |
117
|
|
|
$rabbitMqConfig['user'], |
118
|
|
|
$rabbitMqConfig['password'], |
119
|
|
|
$rabbitMqConfig['vhost'], |
120
|
|
|
]; |
121
|
|
|
|
122
|
|
|
if ($container->hasParameter('dtc_queue.rabbit_mq.ssl') && $container->getParameter('dtc_queue.rabbit_mq.ssl')) { |
123
|
|
|
$class = 'PhpAmqpLib\\Connection\\AMQPSSLConnection'; |
124
|
|
|
if ($container->hasParameter('dtc_queue.rabbit_mq.ssl_options')) { |
125
|
|
|
$arguments[] = $container->getParameter('dtc_queue.rabbit_mq.ssl_options'); |
126
|
|
|
} else { |
127
|
|
|
$arguments[] = []; |
128
|
|
|
} |
129
|
|
|
if ($container->hasParameter('dtc_queue.rabbit_mq.options')) { |
130
|
|
|
$arguments[] = $container->getParameter('dtc_queue.rabbit_mq.options'); |
131
|
|
|
} |
132
|
|
|
} else { |
133
|
|
|
if ($container->hasParameter('dtc_queue.rabbit_mq.options')) { |
134
|
|
|
$options = $container->getParameter('dtc_queue.rabbit_mq.options'); |
135
|
|
|
$this->setRabbitMqOptionsPt1($arguments, $options); |
136
|
|
|
$this->setRabbitMqOptionsPt2($arguments, $options); |
137
|
|
|
} |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
$definition = new Definition($class, $arguments); |
141
|
|
|
$container->setDefinition('dtc_queue.rabbit_mq', $definition); |
142
|
|
|
$definition = $container->getDefinition('dtc_queue.job_manager.rabbit_mq'); |
143
|
|
|
$definition->addMethodCall('setAMQPConnection', [new Reference('dtc_queue.rabbit_mq')]); |
144
|
|
|
$definition->addMethodCall('setQueueArgs', array_values($rabbitMqConfig['queue_args'])); |
145
|
|
|
$definition->addMethodCall('setExchangeArgs', array_values($rabbitMqConfig['exchange_args'])); |
146
|
|
|
} |
147
|
|
|
} |
148
|
|
|
|
149
|
|
|
public function setRabbitMqOptionsPt1(array &$arguments, array $options) |
150
|
|
|
{ |
151
|
|
|
if (isset($options['insist'])) { |
152
|
|
|
$arguments[] = $options['insist']; |
153
|
|
|
} else { |
154
|
|
|
$arguments[] = false; |
155
|
|
|
} |
156
|
|
|
if (isset($options['login_method'])) { |
157
|
|
|
$arguments[] = $options['login_method']; |
158
|
|
|
} else { |
159
|
|
|
$arguments[] = 'AMQPLAIN'; |
160
|
|
|
} |
161
|
|
|
if (isset($options['login_response'])) { |
162
|
|
|
$arguments[] = $options['login_response']; |
163
|
|
|
} else { |
164
|
|
|
$arguments[] = null; |
165
|
|
|
} |
166
|
|
|
if (isset($options['locale'])) { |
167
|
|
|
$arguments[] = $options['locale']; |
168
|
|
|
} else { |
169
|
|
|
$arguments[] = 'en_US'; |
170
|
|
|
} |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
public function setRabbitMqOptionsPt2(array &$arguments, array $options) |
174
|
|
|
{ |
175
|
|
|
if (isset($options['connection_timeout'])) { |
176
|
|
|
$arguments[] = $options['connection_timeout']; |
177
|
|
|
} else { |
178
|
|
|
$arguments[] = 3.0; |
179
|
|
|
} |
180
|
|
|
if (isset($options['read_write_timeout'])) { |
181
|
|
|
$arguments[] = $options['read_write_timeout']; |
182
|
|
|
} else { |
183
|
|
|
$arguments[] = 3.0; |
184
|
|
|
} |
185
|
|
|
if (isset($options['context'])) { |
186
|
|
|
$arguments[] = $options['context']; |
187
|
|
|
} else { |
188
|
|
|
$arguments[] = null; |
189
|
|
|
} |
190
|
|
|
if (isset($options['keepalive'])) { |
191
|
|
|
$arguments[] = $options['keepalive']; |
192
|
|
|
} else { |
193
|
|
|
$arguments[] = false; |
194
|
|
|
} |
195
|
|
|
if (isset($options['heartbeat'])) { |
196
|
|
|
$arguments[] = $options['heartbeat']; |
197
|
|
|
} else { |
198
|
|
|
$arguments[] = 0; |
199
|
|
|
} |
200
|
|
|
} |
201
|
|
|
|
202
|
|
|
/** |
203
|
|
|
* Determines the job class based on the queue manager type. |
204
|
|
|
* |
205
|
|
|
* @param ContainerBuilder $container |
206
|
|
|
* |
207
|
|
|
* @return mixed|string |
208
|
|
|
* |
209
|
|
|
* @throws \Exception |
210
|
|
|
*/ |
211
|
|
|
public function getJobClass(ContainerBuilder $container) |
212
|
|
|
{ |
213
|
|
|
$jobClass = $container->getParameter('dtc_queue.class_job'); |
214
|
|
|
if (!$jobClass) { |
215
|
|
|
switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) { |
216
|
|
|
case 'mongodb': |
217
|
|
|
$jobClass = 'Dtc\\QueueBundle\\Document\\Job'; |
218
|
|
|
break; |
219
|
|
|
case 'beanstalkd': |
220
|
|
|
$jobClass = 'Dtc\\QueueBundle\\Beanstalkd\\Job'; |
221
|
|
|
break; |
222
|
|
|
case 'rabbit_mq': |
223
|
|
|
$jobClass = 'Dtc\\QueueBundle\\RabbitMQ\\Job'; |
224
|
|
|
break; |
225
|
|
|
case 'orm': |
226
|
|
|
$jobClass = 'Dtc\\QueueBundle\\Entity\\Job'; |
227
|
|
|
break; |
228
|
|
|
default: |
229
|
|
|
throw new \Exception("Unknown default_manager type $defaultType - please specify a Job class in the 'class' configuration parameter"); |
230
|
|
|
} |
231
|
|
|
} |
232
|
|
|
|
233
|
|
|
if (!class_exists($jobClass)) { |
234
|
|
|
throw new \Exception("Can't find Job class $jobClass"); |
235
|
|
|
} |
236
|
|
|
|
237
|
|
|
$test = new $jobClass(); |
238
|
|
|
if (!$test instanceof Job) { |
239
|
|
|
throw new \Exception("$jobClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job"); |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
return $jobClass; |
243
|
|
|
} |
244
|
|
|
|
245
|
|
View Code Duplication |
public function getRunClass(ContainerBuilder $container) |
|
|
|
|
246
|
|
|
{ |
247
|
|
|
$runClass = $container->hasParameter('dtc_queue.class_run') ? $container->getParameter('dtc_queue.class_run') : null; |
248
|
|
|
if (!$runClass) { |
249
|
|
|
switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) { |
|
|
|
|
250
|
|
|
case 'mongodb': |
251
|
|
|
$runClass = 'Dtc\\QueueBundle\\Document\\Run'; |
252
|
|
|
break; |
253
|
|
|
case 'orm': |
254
|
|
|
$runClass = 'Dtc\\QueueBundle\\Entity\\Run'; |
255
|
|
|
break; |
256
|
|
|
default: |
257
|
|
|
$runClass = 'Dtc\\QueueBundle\\Model\\Run'; |
258
|
|
|
} |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
if (isset($runClass)) { |
262
|
|
|
if (!class_exists($runClass)) { |
263
|
|
|
throw new \Exception("Can't find Run class $runClass"); |
264
|
|
|
} |
265
|
|
|
} |
266
|
|
|
|
267
|
|
|
$test = new $runClass(); |
268
|
|
|
if (!$test instanceof Run) { |
269
|
|
|
throw new \Exception("$runClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Run"); |
270
|
|
|
} |
271
|
|
|
|
272
|
|
|
return $runClass; |
273
|
|
|
} |
274
|
|
|
|
275
|
|
View Code Duplication |
public function getRunArchiveClass(ContainerBuilder $container) |
|
|
|
|
276
|
|
|
{ |
277
|
|
|
$runArchiveClass = $container->hasParameter('dtc_queue.class_run_archive') ? $container->getParameter('dtc_queue.class_run_archive') : null; |
278
|
|
|
if (!$runArchiveClass) { |
279
|
|
|
switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) { |
|
|
|
|
280
|
|
|
case 'mongodb': |
281
|
|
|
$runArchiveClass = 'Dtc\\QueueBundle\\Document\\RunArchive'; |
282
|
|
|
break; |
283
|
|
|
case 'orm': |
284
|
|
|
$runArchiveClass = 'Dtc\\QueueBundle\\Entity\\RunArchive'; |
285
|
|
|
break; |
286
|
|
|
default: |
287
|
|
|
$runArchiveClass = 'Dtc\\QueueBundle\\Model\\Run'; |
288
|
|
|
} |
289
|
|
|
} |
290
|
|
|
|
291
|
|
|
if (isset($runArchiveClass)) { |
292
|
|
|
if (!class_exists($runArchiveClass)) { |
293
|
|
|
throw new \Exception("Can't find RunArchive class $runArchiveClass"); |
294
|
|
|
} |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
$test = new $runArchiveClass(); |
298
|
|
|
if (!$test instanceof Run) { |
299
|
|
|
throw new \Exception("$runArchiveClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Run"); |
300
|
|
|
} |
301
|
|
|
|
302
|
|
|
return $runArchiveClass; |
303
|
|
|
} |
304
|
|
|
|
305
|
|
|
/** |
306
|
|
|
* Determines the job class based on the queue manager type. |
307
|
|
|
* |
308
|
|
|
* @param ContainerBuilder $container |
309
|
|
|
* |
310
|
|
|
* @return mixed|string |
311
|
|
|
* |
312
|
|
|
* @throws \Exception |
313
|
|
|
*/ |
314
|
|
|
public function getJobClassArchive(ContainerBuilder $container) |
315
|
|
|
{ |
316
|
|
|
$jobArchiveClass = $container->getParameter('dtc_queue.class_job_archive'); |
317
|
|
|
if (!$jobArchiveClass) { |
318
|
|
|
switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) { |
|
|
|
|
319
|
|
|
case 'mongodb': |
320
|
|
|
$jobArchiveClass = 'Dtc\\QueueBundle\\Document\\JobArchive'; |
321
|
|
|
break; |
322
|
|
|
case 'orm': |
323
|
|
|
$jobArchiveClass = 'Dtc\\QueueBundle\\Entity\\JobArchive'; |
324
|
|
|
break; |
325
|
|
|
} |
326
|
|
|
} |
327
|
|
|
|
328
|
|
|
if ($jobArchiveClass && !class_exists($jobArchiveClass)) { |
329
|
|
|
throw new \Exception("Can't find JobArchive class $jobArchiveClass"); |
330
|
|
|
} |
331
|
|
|
|
332
|
|
|
if ($jobArchiveClass) { |
333
|
|
|
$test = new $jobArchiveClass(); |
334
|
|
|
if (!$test instanceof Job) { |
335
|
|
|
throw new \Exception("$jobArchiveClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job"); |
336
|
|
|
} |
337
|
|
|
} |
338
|
|
|
|
339
|
|
|
return $jobArchiveClass; |
340
|
|
|
} |
341
|
|
|
} |
342
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.