1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\DependencyInjection\Compiler; |
4
|
|
|
|
5
|
|
|
use Dtc\GridBundle\DependencyInjection\Compiler\GridSourceCompilerPass; |
6
|
|
|
use Dtc\QueueBundle\Model\Job; |
7
|
|
|
use Dtc\QueueBundle\Model\JobTiming; |
8
|
|
|
use Dtc\QueueBundle\Model\Run; |
9
|
|
|
use Dtc\QueueBundle\Exception\ClassNotFoundException; |
10
|
|
|
use Dtc\QueueBundle\Exception\ClassNotSubclassException; |
11
|
|
|
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
12
|
|
|
use Symfony\Component\DependencyInjection\Alias; |
13
|
|
|
use Symfony\Component\DependencyInjection\ContainerBuilder; |
14
|
|
|
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface; |
15
|
|
|
use Symfony\Component\DependencyInjection\Definition; |
16
|
|
|
use Symfony\Component\DependencyInjection\Reference; |
17
|
|
|
|
18
|
|
|
class WorkerCompilerPass implements CompilerPassInterface |
19
|
|
|
{ |
20
|
4 |
|
public function process(ContainerBuilder $container) |
21
|
|
|
{ |
22
|
4 |
|
if (false === $container->hasDefinition('dtc_queue.manager.worker')) { |
23
|
4 |
|
return; |
24
|
|
|
} |
25
|
|
|
|
26
|
4 |
|
$this->setupAliases($container); |
27
|
|
|
|
28
|
4 |
|
$definition = $container->getDefinition('dtc_queue.manager.worker'); |
29
|
|
|
|
30
|
4 |
|
$jobClass = $this->getJobClass($container); |
31
|
3 |
|
$jobArchiveClass = $this->getJobClassArchive($container); |
32
|
3 |
|
$container->setParameter('dtc_queue.class.job', $jobClass); |
33
|
3 |
|
$container->setParameter('dtc_queue.class.job_archive', $jobArchiveClass); |
34
|
|
|
|
35
|
3 |
|
$managerType = $this->getRunManagerType($container); |
36
|
3 |
|
$jobTimingManagerType = $this->getJobTimingManagerType($container); |
37
|
3 |
|
$container->setParameter('dtc_queue.class.job_timing', $this->getClass( |
38
|
3 |
|
$container, |
39
|
3 |
|
$jobTimingManagerType, |
40
|
3 |
|
'job_timing', |
41
|
3 |
|
'JobTiming', |
42
|
3 |
|
JobTiming::class |
43
|
|
|
)); |
44
|
3 |
|
$container->setParameter('dtc_queue.class.run', $this->getClass($container, $managerType, 'run', 'Run', Run::class)); |
45
|
3 |
|
$container->setParameter('dtc_queue.class.run_archive', $this->getClass($container, $managerType, 'run_archive', 'RunArchive', Run::class)); |
46
|
|
|
|
47
|
3 |
|
$this->setupTaggedServices($container, $definition, $jobClass); |
|
|
|
|
48
|
2 |
|
$eventDispatcher = $container->getDefinition('dtc_queue.event_dispatcher'); |
49
|
2 |
|
foreach ($container->findTaggedServiceIds('dtc_queue.event_subscriber') as $id => $attributes) { |
50
|
|
|
$eventSubscriber = $container->getDefinition($id); |
51
|
|
|
$eventDispatcher->addMethodCall('addSubscriber', [$eventSubscriber]); |
52
|
|
|
} |
53
|
2 |
|
$this->setupDoctrineManagers($container); |
54
|
2 |
|
$this->addLiveJobs($container); |
55
|
2 |
|
} |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @param string $type |
59
|
|
|
*/ |
60
|
4 |
|
protected function setupAlias(ContainerBuilder $container, $defaultManagerType, $type) |
61
|
|
|
{ |
62
|
4 |
|
$definitionName = 'dtc_queue.'.$type.'.'.$defaultManagerType; |
63
|
4 |
|
if (!$container->hasDefinition($definitionName) && !$container->hasAlias($definitionName)) { |
64
|
|
|
throw new InvalidConfigurationException("No $type manager found for dtc_queue.$type.$defaultManagerType"); |
65
|
|
|
} |
66
|
4 |
|
if ($container->hasDefinition($definitionName)) { |
67
|
4 |
|
$alias = new Alias('dtc_queue.'.$type.'.'.$defaultManagerType); |
68
|
4 |
|
$alias->setPublic(true); |
69
|
4 |
|
$container->setAlias('dtc_queue.'.$type, $alias); |
70
|
|
|
|
71
|
4 |
|
return; |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
$container->getAlias($definitionName)->setPublic(true); |
75
|
|
|
$container->setAlias('dtc_queue.'.$type, $container->getAlias($definitionName)); |
76
|
|
|
} |
77
|
|
|
|
78
|
4 |
|
protected function setupAliases(ContainerBuilder $container) |
79
|
|
|
{ |
80
|
4 |
|
$defaultManagerType = $container->getParameter('dtc_queue.manager.job'); |
81
|
4 |
|
$this->setupAlias($container, $defaultManagerType, 'manager.job'); |
82
|
4 |
|
$runManagerType = $container->getParameter($this->getRunManagerType($container)); |
83
|
4 |
|
$this->setupAlias($container, $runManagerType, 'manager.run'); |
84
|
4 |
|
$jobTimingManagerType = $container->getParameter($this->getJobTimingManagerType($container)); |
85
|
4 |
|
$this->setupAlias($container, $jobTimingManagerType, 'manager.job_timing'); |
86
|
4 |
|
} |
87
|
|
|
|
88
|
|
|
/** |
89
|
|
|
* @param ContainerBuilder $container |
90
|
|
|
* @param string $jobClass |
|
|
|
|
91
|
|
|
*/ |
92
|
3 |
|
protected function setupTaggedServices(ContainerBuilder $container, Definition $definition) |
93
|
|
|
{ |
94
|
3 |
|
$jobManagerRef = array(new Reference('dtc_queue.manager.job')); |
95
|
|
|
// Add each worker to workerManager, make sure each worker has instance to work |
96
|
3 |
|
foreach ($container->findTaggedServiceIds('dtc_queue.worker') as $id => $attributes) { |
97
|
2 |
|
$worker = $container->getDefinition($id); |
98
|
2 |
|
$class = $container->getDefinition($id)->getClass(); |
99
|
|
|
|
100
|
2 |
|
$refClass = new \ReflectionClass($class); |
101
|
2 |
|
$workerClass = 'Dtc\QueueBundle\Model\Worker'; |
102
|
2 |
|
if (!$refClass->isSubclassOf($workerClass)) { |
103
|
1 |
|
throw new \InvalidArgumentException(sprintf('Service "%s" must extend class "%s".', $id, $workerClass)); |
104
|
|
|
} |
105
|
|
|
|
106
|
|
|
// Give each worker access to job manager |
107
|
1 |
|
$worker->addMethodCall('setJobManager', $jobManagerRef); |
108
|
1 |
|
$definition->addMethodCall('addWorker', array(new Reference($id))); |
109
|
|
|
} |
110
|
2 |
|
} |
111
|
|
|
|
112
|
|
|
/** |
113
|
|
|
* @param ContainerBuilder $container |
114
|
|
|
*/ |
115
|
2 |
|
protected function setupDoctrineManagers(ContainerBuilder $container) |
116
|
|
|
{ |
117
|
2 |
|
$documentManager = $container->getParameter('dtc_queue.odm.document_manager'); |
118
|
|
|
|
119
|
2 |
|
$odmManager = "doctrine_mongodb.odm.{$documentManager}_document_manager"; |
120
|
2 |
|
if ($container->has($odmManager)) { |
121
|
|
|
$container->setAlias('dtc_queue.document_manager', $odmManager); |
122
|
|
|
} |
123
|
|
|
|
124
|
2 |
|
$entityManager = $container->getParameter('dtc_queue.orm.entity_manager'); |
125
|
|
|
|
126
|
2 |
|
$ormManager = "doctrine.orm.{$entityManager}_entity_manager"; |
127
|
2 |
|
if ($container->has($ormManager)) { |
128
|
|
|
$container->setAlias('dtc_queue.entity_manager', $ormManager); |
129
|
|
|
} |
130
|
2 |
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* @param ContainerBuilder $container |
134
|
|
|
*/ |
135
|
2 |
|
protected function addLiveJobs(ContainerBuilder $container) |
136
|
|
|
{ |
137
|
2 |
|
$jobReflection = new \ReflectionClass($container->getParameter('dtc_queue.class.job')); |
138
|
2 |
|
if ($jobReflection->isInstance(new \Dtc\QueueBundle\Document\Job())) { |
139
|
2 |
|
GridSourceCompilerPass::addGridSource($container, 'dtc_queue.grid_source.jobs_waiting.odm'); |
140
|
2 |
|
GridSourceCompilerPass::addGridSource($container, 'dtc_queue.grid_source.jobs_running.odm'); |
141
|
|
|
} |
142
|
2 |
|
if ($jobReflection->isInstance(new \Dtc\QueueBundle\Entity\Job())) { |
143
|
1 |
|
GridSourceCompilerPass::addGridSource($container, 'dtc_queue.grid_source.jobs_waiting.orm'); |
144
|
1 |
|
GridSourceCompilerPass::addGridSource($container, 'dtc_queue.grid_source.jobs_running.orm'); |
145
|
|
|
} |
146
|
2 |
|
} |
147
|
|
|
|
148
|
|
|
/** |
149
|
|
|
* @param $managerType |
150
|
|
|
* |
151
|
|
|
* @return null|string |
152
|
|
|
*/ |
153
|
4 |
|
protected function getDirectory($managerType) |
154
|
|
|
{ |
155
|
4 |
|
switch ($managerType) { |
156
|
|
|
case 'odm': |
157
|
3 |
|
return 'Document'; |
158
|
|
|
case 'beanstalkd': |
159
|
1 |
|
return 'Beanstalkd'; |
160
|
|
|
case 'rabbit_mq': |
161
|
1 |
|
return 'RabbitMQ'; |
162
|
|
|
case 'orm': |
163
|
1 |
|
return 'Entity'; |
164
|
|
|
case 'redis': |
165
|
1 |
|
return 'Redis'; |
166
|
|
|
} |
167
|
|
|
|
168
|
1 |
|
return null; |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* Determines the job class based on the queue manager type. |
173
|
|
|
* |
174
|
|
|
* @param ContainerBuilder $container |
175
|
|
|
* |
176
|
|
|
* @return mixed|string |
177
|
|
|
* |
178
|
|
|
* @throws InvalidConfigurationException |
179
|
|
|
*/ |
180
|
4 |
|
protected function getJobClass(ContainerBuilder $container) |
181
|
|
|
{ |
182
|
4 |
|
$jobClass = $container->getParameter('dtc_queue.class.job'); |
183
|
4 |
|
if (!$jobClass) { |
184
|
4 |
|
if ($directory = $this->getDirectory($managerType = $container->getParameter('dtc_queue.manager.job'))) { |
185
|
3 |
|
$jobClass = 'Dtc\QueueBundle\\'.$directory.'\Job'; |
186
|
|
|
} else { |
187
|
1 |
|
throw new InvalidConfigurationException('Unknown manager.job type '.$managerType.' - please specify a Job class in the \'class\' configuration parameter'); |
188
|
|
|
} |
189
|
|
|
} |
190
|
|
|
|
191
|
3 |
|
$this->testClass($jobClass, Job::class); |
192
|
|
|
|
193
|
3 |
|
return $jobClass; |
194
|
|
|
} |
195
|
|
|
|
196
|
4 |
|
protected function getRunManagerType(ContainerBuilder $container) |
197
|
|
|
{ |
198
|
4 |
|
$managerType = 'dtc_queue.manager.job'; |
199
|
4 |
|
if ($container->hasParameter('dtc_queue.manager.run')) { |
200
|
4 |
|
$managerType = 'dtc_queue.manager.run'; |
201
|
|
|
} |
202
|
|
|
|
203
|
4 |
|
return $managerType; |
204
|
|
|
} |
205
|
|
|
|
206
|
4 |
|
protected function getJobTimingManagerType(ContainerBuilder $container) |
207
|
|
|
{ |
208
|
4 |
|
$managerType = $this->getRunManagerType($container); |
209
|
4 |
|
if ($container->hasParameter('dtc_queue.manager.job_timing')) { |
210
|
4 |
|
$managerType = 'dtc_queue.manager.job_timing'; |
211
|
|
|
} |
212
|
|
|
|
213
|
4 |
|
return $managerType; |
214
|
|
|
} |
215
|
|
|
|
216
|
|
|
/** |
217
|
|
|
* @param string $managerType |
218
|
|
|
* @param string $type |
219
|
|
|
* @param string $className |
220
|
|
|
*/ |
221
|
3 |
|
protected function getClass(ContainerBuilder $container, $managerType, $type, $className, $baseClass) |
222
|
|
|
{ |
223
|
3 |
|
$runClass = $container->hasParameter('dtc_queue.class.'.$type) ? $container->getParameter('dtc_queue.class.'.$type) : null; |
224
|
3 |
|
if (!$runClass) { |
225
|
3 |
|
switch ($container->getParameter($managerType)) { |
226
|
|
|
case 'odm': |
227
|
3 |
|
$runClass = 'Dtc\\QueueBundle\\Document\\'.$className; |
228
|
3 |
|
break; |
229
|
|
|
case 'orm': |
230
|
1 |
|
$runClass = 'Dtc\\QueueBundle\\Entity\\'.$className; |
231
|
1 |
|
break; |
232
|
|
|
default: |
233
|
|
|
$runClass = $baseClass; |
234
|
|
|
} |
235
|
|
|
} |
236
|
|
|
|
237
|
3 |
|
$this->testClass($runClass, $baseClass); |
238
|
|
|
|
239
|
3 |
|
return $runClass; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* @throws ClassNotFoundException |
244
|
|
|
* @throws ClassNotSubclassException |
245
|
|
|
*/ |
246
|
3 |
|
protected function testClass($className, $parent) |
247
|
|
|
{ |
248
|
3 |
|
if (!class_exists($className)) { |
249
|
|
|
throw new ClassNotFoundException("Can't find class $className"); |
250
|
|
|
} |
251
|
|
|
|
252
|
3 |
|
$test = new $className(); |
253
|
3 |
|
if (!$test instanceof $className) { |
254
|
|
|
throw new ClassNotSubclassException("$className must be instance of (or derived from) $parent"); |
255
|
|
|
} |
256
|
3 |
|
} |
257
|
|
|
|
258
|
|
|
/** |
259
|
|
|
* Determines the job class based on the queue manager type. |
260
|
|
|
* |
261
|
|
|
* @param ContainerBuilder $container |
262
|
|
|
* |
263
|
|
|
* @return mixed|string |
264
|
|
|
* |
265
|
|
|
* @throws ClassNotFoundException |
266
|
|
|
* @throws ClassNotSubclassException |
267
|
|
|
*/ |
268
|
3 |
|
protected function getJobClassArchive(ContainerBuilder $container) |
269
|
|
|
{ |
270
|
3 |
|
$jobArchiveClass = $container->getParameter('dtc_queue.class.job_archive'); |
271
|
3 |
|
if (!$jobArchiveClass) { |
272
|
3 |
|
switch ($container->getParameter('dtc_queue.manager.job')) { |
273
|
|
|
case 'odm': |
274
|
3 |
|
$jobArchiveClass = 'Dtc\\QueueBundle\\Document\\JobArchive'; |
275
|
3 |
|
break; |
276
|
|
|
case 'orm': |
277
|
1 |
|
$jobArchiveClass = 'Dtc\\QueueBundle\\Entity\\JobArchive'; |
278
|
1 |
|
break; |
279
|
|
|
} |
280
|
|
|
} |
281
|
3 |
|
if (null !== $jobArchiveClass) { |
282
|
3 |
|
$this->testClass($jobArchiveClass, Job::class); |
283
|
|
|
} |
284
|
|
|
|
285
|
3 |
|
return $jobArchiveClass; |
286
|
|
|
} |
287
|
|
|
} |
288
|
|
|
|
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.
In this case you can add the
@ignore
PhpDoc annotation to the duplicate definition and it will be ignored.