1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/* |
4
|
|
|
* Copyright 2012 Johannes M. Schmitt <[email protected]> |
5
|
|
|
* |
6
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
7
|
|
|
* you may not use this file except in compliance with the License. |
8
|
|
|
* You may obtain a copy of the License at |
9
|
|
|
* |
10
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0 |
11
|
|
|
* |
12
|
|
|
* Unless required by applicable law or agreed to in writing, software |
13
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, |
14
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
15
|
|
|
* See the License for the specific language governing permissions and |
16
|
|
|
* limitations under the License. |
17
|
|
|
*/ |
18
|
|
|
|
19
|
|
|
namespace JMS\JobQueueBundle\Entity\Repository; |
20
|
|
|
|
21
|
|
|
use Doctrine\Common\Collections\ArrayCollection; |
22
|
|
|
use Doctrine\Common\Util\ClassUtils; |
23
|
|
|
use Doctrine\DBAL\Connection; |
24
|
|
|
use Doctrine\ORM\EntityManager; |
25
|
|
|
use Doctrine\ORM\Query\Parameter; |
26
|
|
|
use Doctrine\ORM\Query\ResultSetMappingBuilder; |
27
|
|
|
use Doctrine\Persistence\ManagerRegistry as PersistenceManagerRegistry; |
28
|
|
|
use JMS\JobQueueBundle\Entity\Job; |
29
|
|
|
use JMS\JobQueueBundle\Event\StateChangeEvent; |
30
|
|
|
use JMS\JobQueueBundle\Retry\ExponentialRetryScheduler; |
31
|
|
|
use JMS\JobQueueBundle\Retry\RetryScheduler; |
32
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
33
|
|
|
|
34
|
|
|
class JobManager |
35
|
|
|
{ |
36
|
|
|
private $dispatcher; |
37
|
|
|
private $registry; |
38
|
|
|
private $retryScheduler; |
39
|
|
|
|
40
|
46 |
|
public function __construct(PersistenceManagerRegistry $managerRegistry, EventDispatcherInterface $eventDispatcher, RetryScheduler $retryScheduler = null) |
41
|
|
|
{ |
42
|
46 |
|
$this->registry = $managerRegistry; |
43
|
46 |
|
$this->dispatcher = $eventDispatcher; |
44
|
46 |
|
$this->retryScheduler = $retryScheduler; |
45
|
46 |
|
} |
46
|
|
|
|
47
|
30 |
|
public function findStartableJob($workerName, array &$excludedIds = array(), $excludedQueues = array(), $restrictedQueues = array()) |
48
|
|
|
{ |
49
|
30 |
|
while (null !== $job = $this->findPendingJob($excludedIds, $excludedQueues, $restrictedQueues)) { |
50
|
28 |
|
if ($job->isStartable() && $this->acquireLock($workerName, $job)) { |
51
|
28 |
|
return $job; |
52
|
|
|
} |
53
|
|
|
|
54
|
4 |
|
$excludedIds[] = $job->getId(); |
55
|
|
|
|
56
|
|
|
// We do not want to have non-startable jobs floating around in |
57
|
|
|
// cache as they might be changed by another process. So, better |
58
|
|
|
// re-fetch them when they are not excluded anymore. |
59
|
4 |
|
$this->getJobManager()->getUnitOfWork()->detach($job); // ! detach is deprecated and has no replacement |
60
|
|
|
} |
61
|
|
|
|
62
|
28 |
|
return null; |
63
|
|
|
} |
64
|
|
|
|
65
|
28 |
|
private function acquireLock($workerName, Job $job) |
66
|
|
|
{ |
67
|
28 |
|
$affectedRows = $this->getJobManager()->getConnection()->executeStatement( |
68
|
28 |
|
"UPDATE jms_jobs SET workerName = :worker WHERE id = :id AND workerName IS NULL", |
69
|
|
|
array( |
70
|
28 |
|
'worker' => $workerName, |
71
|
28 |
|
'id' => $job->getId(), |
72
|
|
|
) |
73
|
|
|
); |
74
|
|
|
|
75
|
28 |
|
if ($affectedRows > 0) { |
76
|
28 |
|
$job->setWorkerName($workerName); |
77
|
|
|
|
78
|
28 |
|
return true; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
return false; |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
public function findAllForRelatedEntity($relatedEntity) |
85
|
|
|
{ |
86
|
|
|
list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity); |
87
|
|
|
|
88
|
|
|
$rsm = new ResultSetMappingBuilder($this->getJobManager()); |
89
|
|
|
$rsm->addRootEntityFromClassMetadata('JMSJobQueueBundle:Job', 'j'); |
90
|
|
|
|
91
|
|
|
return $this->getJobManager()->createNativeQuery("SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId", $rsm) |
92
|
|
|
->setParameter('relClass', $relClass) |
93
|
|
|
->setParameter('relId', $relId) |
94
|
|
|
->getResult(); |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
public function findOpenJobForRelatedEntity($command, $relatedEntity) |
98
|
|
|
{ |
99
|
|
|
return $this->findJobForRelatedEntity($command, $relatedEntity, array(Job::STATE_RUNNING, Job::STATE_PENDING, Job::STATE_NEW)); |
100
|
|
|
} |
101
|
|
|
|
102
|
2 |
|
public function findJobForRelatedEntity($command, $relatedEntity, array $states = array()) |
103
|
|
|
{ |
104
|
2 |
|
list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity); |
105
|
|
|
|
106
|
2 |
|
$rsm = new ResultSetMappingBuilder($this->getJobManager()); |
107
|
2 |
|
$rsm->addRootEntityFromClassMetadata('JMSJobQueueBundle:Job', 'j'); |
108
|
|
|
|
109
|
2 |
|
$sql = "SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId AND j.command = :command"; |
110
|
2 |
|
$params = new ArrayCollection(); |
111
|
2 |
|
$params->add(new Parameter('command', $command)); |
112
|
2 |
|
$params->add(new Parameter('relClass', $relClass)); |
113
|
2 |
|
$params->add(new Parameter('relId', $relId)); |
114
|
|
|
|
115
|
2 |
|
if (!empty($states)) { |
116
|
|
|
$sql .= " AND j.state IN (:states)"; |
117
|
|
|
$params->add(new Parameter('states', $states, Connection::PARAM_STR_ARRAY)); |
118
|
|
|
} |
119
|
|
|
|
120
|
2 |
|
return $this->getJobManager()->createNativeQuery($sql, $rsm) |
121
|
2 |
|
->setParameters($params) |
122
|
2 |
|
->getOneOrNullResult(); |
123
|
|
|
} |
124
|
|
|
|
125
|
2 |
|
private function getRelatedEntityIdentifier($entity) |
126
|
|
|
{ |
127
|
2 |
|
if (!is_object($entity)) { |
128
|
|
|
throw new \RuntimeException('$entity must be an object.'); |
129
|
|
|
} |
130
|
|
|
|
131
|
2 |
|
if ($entity instanceof \Doctrine\Persistence\Proxy) { |
132
|
|
|
$entity->__load(); |
133
|
|
|
} |
134
|
|
|
|
135
|
2 |
|
$relClass = ClassUtils::getClass($entity); |
136
|
2 |
|
$relId = $this->registry->getManagerForClass($relClass)->getMetadataFactory() |
137
|
2 |
|
->getMetadataFor($relClass)->getIdentifierValues($entity); |
138
|
2 |
|
asort($relId); |
139
|
|
|
|
140
|
2 |
|
if (!$relId) { |
|
|
|
|
141
|
|
|
throw new \InvalidArgumentException(sprintf('The identifier for entity of class "%s" was empty.', $relClass)); |
142
|
|
|
} |
143
|
|
|
|
144
|
2 |
|
return array($relClass, json_encode($relId)); |
145
|
|
|
} |
146
|
|
|
|
147
|
36 |
|
public function findPendingJob(array $excludedIds = array(), array $excludedQueues = array(), array $restrictedQueues = array()) |
148
|
|
|
{ |
149
|
36 |
|
$qb = $this->getJobManager()->createQueryBuilder(); |
150
|
36 |
|
$qb->select('j')->from('JMSJobQueueBundle:Job', 'j') |
151
|
36 |
|
->orderBy('j.priority', 'ASC') |
152
|
36 |
|
->addOrderBy('j.id', 'ASC'); |
153
|
|
|
|
154
|
36 |
|
$conditions = array(); |
155
|
|
|
|
156
|
36 |
|
$conditions[] = $qb->expr()->isNull('j.workerName'); |
157
|
|
|
|
158
|
36 |
|
$conditions[] = $qb->expr()->lt('j.executeAfter', ':now'); |
159
|
36 |
|
$qb->setParameter(':now', new \DateTime(), 'datetime'); |
160
|
|
|
|
161
|
36 |
|
$conditions[] = $qb->expr()->eq('j.state', ':state'); |
162
|
36 |
|
$qb->setParameter('state', Job::STATE_PENDING); |
163
|
|
|
|
164
|
36 |
|
if (!empty($excludedIds)) { |
165
|
8 |
|
$conditions[] = $qb->expr()->notIn('j.id', ':excludedIds'); |
166
|
8 |
|
$qb->setParameter('excludedIds', $excludedIds, Connection::PARAM_INT_ARRAY); |
167
|
|
|
} |
168
|
|
|
|
169
|
36 |
|
if (!empty($excludedQueues)) { |
170
|
24 |
|
$conditions[] = $qb->expr()->notIn('j.queue', ':excludedQueues'); |
171
|
24 |
|
$qb->setParameter('excludedQueues', $excludedQueues, Connection::PARAM_STR_ARRAY); |
172
|
|
|
} |
173
|
|
|
|
174
|
36 |
|
if (!empty($restrictedQueues)) { |
175
|
6 |
|
$conditions[] = $qb->expr()->in('j.queue', ':restrictedQueues'); |
176
|
6 |
|
$qb->setParameter('restrictedQueues', $restrictedQueues, Connection::PARAM_STR_ARRAY); |
177
|
|
|
} |
178
|
|
|
|
179
|
36 |
|
$qb->where(call_user_func_array(array($qb->expr(), 'andX'), $conditions)); |
180
|
|
|
|
181
|
36 |
|
return $qb->getQuery()->setMaxResults(1)->getOneOrNullResult(); |
182
|
|
|
} |
183
|
|
|
|
184
|
30 |
|
public function closeJob(Job $job, $finalState) |
185
|
|
|
{ |
186
|
30 |
|
$this->getJobManager()->getConnection()->beginTransaction(); |
187
|
|
|
try { |
188
|
30 |
|
$visited = array(); |
189
|
30 |
|
$this->closeJobInternal($job, $finalState, $visited); |
190
|
30 |
|
$this->getJobManager()->flush(); |
191
|
30 |
|
$this->getJobManager()->getConnection()->commit(); |
192
|
|
|
|
193
|
|
|
// Clean-up entity manager to allow for garbage collection to kick in. |
194
|
30 |
|
foreach ($visited as $job) { |
195
|
|
|
// If the job is an original job which is now being retried, let's |
196
|
|
|
// not remove it just yet. |
197
|
30 |
|
if (!$job->isClosedNonSuccessful() || $job->isRetryJob()) { |
198
|
20 |
|
continue; |
199
|
|
|
} |
200
|
|
|
|
201
|
10 |
|
$this->getJobManager()->getUnitOfWork()->detach($job); // ! detach is deprecated and has no replacement |
202
|
|
|
} |
203
|
|
|
} catch (\Throwable $ex) { |
204
|
|
|
$this->getJobManager()->getConnection()->rollback(); |
205
|
|
|
|
206
|
|
|
throw $ex; |
207
|
|
|
} |
208
|
30 |
|
} |
209
|
|
|
|
210
|
30 |
|
private function closeJobInternal(Job $job, $finalState, array &$visited = array()) |
211
|
|
|
{ |
212
|
30 |
|
if (in_array($job, $visited, true)) { |
213
|
|
|
return; |
214
|
|
|
} |
215
|
30 |
|
$visited[] = $job; |
216
|
|
|
|
217
|
30 |
|
if ($job->isInFinalState()) { |
218
|
|
|
return; |
219
|
|
|
} |
220
|
|
|
|
221
|
30 |
|
if (null !== $this->dispatcher && ($job->isRetryJob() || 0 === count($job->getRetryJobs()))) { |
222
|
30 |
|
$event = new StateChangeEvent($job, $finalState); |
223
|
30 |
|
$this->dispatcher->dispatch($event); |
|
|
|
|
224
|
30 |
|
$finalState = $event->getNewState(); |
225
|
|
|
} |
226
|
|
|
|
227
|
30 |
|
switch ($finalState) { |
228
|
|
|
case Job::STATE_CANCELED: |
229
|
6 |
|
$job->setState(Job::STATE_CANCELED); |
230
|
6 |
|
$this->getJobManager()->persist($job); |
231
|
|
|
|
232
|
6 |
|
if ($job->isRetryJob()) { |
233
|
|
|
$this->closeJobInternal($job->getOriginalJob(), Job::STATE_CANCELED, $visited); |
234
|
|
|
|
235
|
|
|
return; |
236
|
|
|
} |
237
|
|
|
|
238
|
6 |
|
foreach ($this->findIncomingDependencies($job) as $dep) { |
239
|
2 |
|
$this->closeJobInternal($dep, Job::STATE_CANCELED, $visited); |
240
|
|
|
} |
241
|
|
|
|
242
|
6 |
|
return; |
243
|
|
|
|
244
|
|
|
case Job::STATE_FAILED: |
245
|
|
|
case Job::STATE_TERMINATED: |
246
|
|
|
case Job::STATE_INCOMPLETE: |
247
|
12 |
|
if ($job->isRetryJob()) { |
248
|
2 |
|
$job->setState($finalState); |
249
|
2 |
|
$this->getJobManager()->persist($job); |
250
|
|
|
|
251
|
2 |
|
$this->closeJobInternal($job->getOriginalJob(), $finalState); |
252
|
|
|
|
253
|
2 |
|
return; |
254
|
|
|
} |
255
|
|
|
|
256
|
|
|
// The original job has failed, and we are allowed to retry it. |
257
|
12 |
|
if ($job->isRetryAllowed()) { |
258
|
4 |
|
$retryJob = new Job($job->getCommand(), $job->getArgs(), true, $job->getQueue(), $job->getPriority()); |
259
|
4 |
|
$retryJob->setMaxRuntime($job->getMaxRuntime()); |
260
|
|
|
|
261
|
4 |
|
if ($this->retryScheduler === null) { |
262
|
|
|
$this->retryScheduler = new ExponentialRetryScheduler(5); |
263
|
|
|
} |
264
|
|
|
|
265
|
4 |
|
$retryJob->setExecuteAfter($this->retryScheduler->scheduleNextRetry($job)); |
266
|
|
|
|
267
|
4 |
|
$job->addRetryJob($retryJob); |
268
|
4 |
|
$this->getJobManager()->persist($retryJob); |
269
|
4 |
|
$this->getJobManager()->persist($job); |
270
|
|
|
|
271
|
4 |
|
return; |
272
|
|
|
} |
273
|
|
|
|
274
|
10 |
|
$job->setState($finalState); |
275
|
10 |
|
$this->getJobManager()->persist($job); |
276
|
|
|
|
277
|
|
|
// The original job has failed, and no retries are allowed. |
278
|
10 |
|
foreach ($this->findIncomingDependencies($job) as $dep) { |
279
|
|
|
// This is a safe-guard to avoid blowing up if there is a database inconsistency. |
280
|
4 |
|
if (!$dep->isPending() && !$dep->isNew()) { |
281
|
|
|
continue; |
282
|
|
|
} |
283
|
|
|
|
284
|
4 |
|
$this->closeJobInternal($dep, Job::STATE_CANCELED, $visited); |
285
|
|
|
} |
286
|
|
|
|
287
|
10 |
|
return; |
288
|
|
|
|
289
|
|
|
case Job::STATE_FINISHED: |
290
|
18 |
|
if ($job->isRetryJob()) { |
291
|
2 |
|
$job->getOriginalJob()->setState($finalState); |
292
|
2 |
|
$this->getJobManager()->persist($job->getOriginalJob()); |
293
|
|
|
} |
294
|
18 |
|
$job->setState($finalState); |
295
|
18 |
|
$this->getJobManager()->persist($job); |
296
|
|
|
|
297
|
18 |
|
return; |
298
|
|
|
|
299
|
|
|
default: |
300
|
|
|
throw new \LogicException(sprintf('Non allowed state "%s" in closeJobInternal().', $finalState)); |
301
|
|
|
} |
302
|
|
|
} |
303
|
|
|
|
304
|
|
|
/** |
305
|
|
|
* @return Job[] |
306
|
|
|
*/ |
307
|
12 |
View Code Duplication |
public function findIncomingDependencies(Job $job) |
|
|
|
|
308
|
|
|
{ |
309
|
12 |
|
$jobIds = $this->getJobIdsOfIncomingDependencies($job); |
310
|
12 |
|
if (empty($jobIds)) { |
311
|
12 |
|
return array(); |
312
|
|
|
} |
313
|
|
|
|
314
|
6 |
|
return $this->getJobManager()->createQuery("SELECT j, d FROM JMSJobQueueBundle:Job j LEFT JOIN j.dependencies d WHERE j.id IN (:ids)") |
315
|
6 |
|
->setParameter('ids', $jobIds) |
316
|
6 |
|
->getResult(); |
317
|
|
|
} |
318
|
|
|
|
319
|
|
|
/** |
320
|
|
|
* @return Job[] |
321
|
|
|
*/ |
322
|
|
View Code Duplication |
public function getIncomingDependencies(Job $job) |
|
|
|
|
323
|
|
|
{ |
324
|
|
|
$jobIds = $this->getJobIdsOfIncomingDependencies($job); |
325
|
|
|
if (empty($jobIds)) { |
326
|
|
|
return array(); |
327
|
|
|
} |
328
|
|
|
|
329
|
|
|
return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.id IN (:ids)") |
330
|
|
|
->setParameter('ids', $jobIds) |
331
|
|
|
->getResult(); |
332
|
|
|
} |
333
|
|
|
|
334
|
12 |
|
private function getJobIdsOfIncomingDependencies(Job $job) |
335
|
|
|
{ |
336
|
12 |
|
$jobIds = $this->getJobManager()->getConnection() |
337
|
12 |
|
->executeQuery("SELECT source_job_id FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId())) |
338
|
12 |
|
->fetchAll(\PDO::FETCH_COLUMN); |
339
|
|
|
|
340
|
12 |
|
return $jobIds; |
341
|
|
|
} |
342
|
|
|
|
343
|
|
|
public function findLastJobsWithError($nbJobs = 10) |
344
|
|
|
{ |
345
|
|
|
return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state IN (:errorStates) AND j.originalJob IS NULL ORDER BY j.closedAt DESC") |
346
|
|
|
->setParameter('errorStates', array(Job::STATE_TERMINATED, Job::STATE_FAILED)) |
347
|
|
|
->setMaxResults($nbJobs) |
348
|
|
|
->getResult(); |
349
|
|
|
} |
350
|
|
|
|
351
|
|
|
public function getAvailableQueueList() |
352
|
|
|
{ |
353
|
|
|
$queues = $this->getJobManager()->createQuery("SELECT DISTINCT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) GROUP BY j.queue") |
354
|
|
|
->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING)) |
355
|
|
|
->getResult(); |
356
|
|
|
|
357
|
|
|
$newQueueArray = array(); |
358
|
|
|
|
359
|
|
|
foreach ($queues as $queue) { |
360
|
|
|
$newQueue = $queue['queue']; |
361
|
|
|
$newQueueArray[] = $newQueue; |
362
|
|
|
} |
363
|
|
|
|
364
|
|
|
return $newQueueArray; |
365
|
|
|
} |
366
|
|
|
|
367
|
|
|
|
368
|
|
|
public function getAvailableJobsForQueueCount($jobQueue) |
369
|
|
|
{ |
370
|
|
|
$result = $this->getJobManager()->createQuery("SELECT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) AND j.queue = :queue") |
371
|
|
|
->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING)) |
372
|
|
|
->setParameter('queue', $jobQueue) |
373
|
|
|
->setMaxResults(1) |
374
|
|
|
->getOneOrNullResult(); |
375
|
|
|
|
376
|
|
|
return count($result); |
377
|
|
|
} |
378
|
|
|
|
379
|
44 |
|
private function getJobManager(): EntityManager |
380
|
|
|
{ |
381
|
44 |
|
return $this->registry->getManagerForClass(Job::class); |
382
|
|
|
} |
383
|
|
|
} |
384
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.