1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ORM; |
4
|
|
|
|
5
|
|
|
use Doctrine\ORM\EntityManager; |
6
|
|
|
use Doctrine\ORM\EntityRepository; |
7
|
|
|
use Doctrine\ORM\QueryBuilder; |
8
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
9
|
|
|
use Dtc\QueueBundle\Entity\Job; |
10
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
11
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
12
|
|
|
use Symfony\Component\Process\Exception\LogicException; |
13
|
|
|
|
14
|
|
|
class JobManager extends BaseJobManager |
15
|
|
|
{ |
16
|
|
|
use CommonTrait; |
17
|
|
|
protected static $saveInsertCalled = null; |
18
|
|
|
protected static $resetInsertCalled = null; |
19
|
|
|
|
20
|
|
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
21
|
|
|
{ |
22
|
|
|
/** @var EntityManager $objectManager */ |
23
|
|
|
$objectManager = $this->getObjectManager(); |
24
|
|
|
|
25
|
|
|
$queryBuilder = $objectManager |
26
|
|
|
->createQueryBuilder() |
27
|
|
|
->select('count(a.id)') |
28
|
|
|
->from($objectName, 'a') |
29
|
|
|
->where('a.status = :status'); |
30
|
|
|
|
31
|
|
|
if (null !== $workerName) { |
32
|
|
|
$queryBuilder->andWhere('a.workerName = :workerName') |
33
|
|
|
->setParameter(':workerName', $workerName); |
34
|
|
|
} |
35
|
|
|
|
36
|
|
|
if (null !== $method) { |
37
|
|
|
$queryBuilder->andWhere('a.method = :method') |
38
|
|
|
->setParameter(':method', $workerName); |
39
|
|
|
} |
40
|
|
|
|
41
|
|
|
$count = $queryBuilder->setParameter(':status', $status) |
42
|
|
|
->getQuery()->getSingleScalarResult(); |
43
|
|
|
|
44
|
|
|
if (!$count) { |
45
|
|
|
return 0; |
46
|
|
|
} |
47
|
|
|
|
48
|
|
|
return $count; |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @param string|null $workerName |
53
|
|
|
* @param string|null $method |
54
|
|
|
* |
55
|
|
|
* @return int Count of jobs pruned |
56
|
|
|
*/ |
57
|
|
|
public function pruneErroneousJobs($workerName = null, $method = null) |
58
|
|
|
{ |
59
|
|
|
/** @var EntityManager $objectManager */ |
60
|
|
|
$objectManager = $this->getObjectManager(); |
61
|
|
|
$queryBuilder = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j'); |
62
|
|
|
$queryBuilder->where('j.status = :status') |
63
|
|
|
->setParameter(':status', BaseJob::STATUS_ERROR); |
64
|
|
|
|
65
|
|
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
66
|
|
|
$query = $queryBuilder->getQuery(); |
67
|
|
|
|
68
|
|
|
return intval($query->execute()); |
69
|
|
|
} |
70
|
|
|
|
71
|
1 |
|
protected function resetSaveOk($function) |
72
|
1 |
|
{ |
73
|
|
|
$objectManager = $this->getObjectManager(); |
74
|
|
|
$splObjectHash = spl_object_hash($objectManager); |
75
|
|
|
|
76
|
|
|
if ('save' === $function) { |
77
|
|
|
$compare = static::$resetInsertCalled; |
78
|
|
|
} else { |
79
|
|
|
$compare = static::$saveInsertCalled; |
80
|
|
|
} |
81
|
|
|
|
82
|
|
|
if ($splObjectHash === $compare) { |
83
|
|
|
// Insert SQL is cached... |
84
|
|
|
$msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)"; |
85
|
|
|
throw new LogicException($msg); |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
if ('save' === $function) { |
89
|
|
|
static::$saveInsertCalled = spl_object_hash($objectManager); |
90
|
|
|
} else { |
91
|
|
|
static::$resetInsertCalled = spl_object_hash($objectManager); |
92
|
|
|
} |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
/** |
96
|
|
|
* @param string $workerName |
97
|
|
|
* @param string $method |
98
|
|
|
*/ |
99
|
|
|
protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null) |
100
|
|
|
{ |
101
|
|
|
if (null !== $workerName) { |
102
|
|
|
$queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName); |
103
|
|
|
} |
104
|
|
|
|
105
|
|
|
if (null !== $method) { |
106
|
|
|
$queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method); |
107
|
|
|
} |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
protected function updateExpired($workerName = null, $method = null) |
111
|
|
|
{ |
112
|
|
|
/** @var EntityManager $objectManager */ |
113
|
|
|
$objectManager = $this->getObjectManager(); |
114
|
|
|
$queryBuilder = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j'); |
115
|
|
|
$queryBuilder->set('j.status', ':newStatus'); |
116
|
|
|
$queryBuilder->where('j.expiresAt <= :expiresAt') |
117
|
|
|
->setParameter(':expiresAt', new \DateTime()); |
118
|
|
|
$queryBuilder->andWhere('j.status = :status') |
119
|
|
|
->setParameter(':status', BaseJob::STATUS_NEW) |
120
|
|
|
->setParameter(':newStatus', Job::STATUS_EXPIRED); |
121
|
|
|
|
122
|
|
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
123
|
|
|
$query = $queryBuilder->getQuery(); |
124
|
|
|
|
125
|
|
|
return intval($query->execute()); |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
/** |
129
|
|
|
* Removes archived jobs older than $olderThan. |
130
|
|
|
* |
131
|
|
|
* @param \DateTime $olderThan |
132
|
|
|
*/ |
133
|
|
|
public function pruneArchivedJobs(\DateTime $olderThan) |
134
|
|
|
{ |
135
|
|
|
return $this->removeOlderThan($this->getArchiveObjectName(), |
136
|
|
|
'updatedAt', |
137
|
|
|
$olderThan); |
138
|
|
|
} |
139
|
|
|
|
140
|
1 |
|
public function getJobCount($workerName = null, $method = null) |
141
|
|
|
{ |
142
|
|
|
/** @var EntityManager $objectManager */ |
143
|
1 |
|
$objectManager = $this->getObjectManager(); |
144
|
1 |
|
$queryBuilder = $objectManager->createQueryBuilder(); |
145
|
|
|
|
146
|
1 |
|
$queryBuilder = $queryBuilder->select('count(j)')->from($this->getObjectName(), 'j'); |
147
|
|
|
|
148
|
1 |
|
$where = 'where'; |
149
|
1 |
|
if (null !== $workerName) { |
150
|
|
|
if (null !== $method) { |
151
|
|
|
$queryBuilder->where($queryBuilder->expr()->andX( |
152
|
|
|
$queryBuilder->expr()->eq('j.workerName', ':workerName'), |
153
|
|
|
$queryBuilder->expr()->eq('j.method', ':method') |
154
|
|
|
)) |
155
|
|
|
->setParameter(':method', $method); |
156
|
|
|
} else { |
157
|
|
|
$queryBuilder->where('j.workerName = :workerName'); |
158
|
|
|
} |
159
|
|
|
$queryBuilder->setParameter(':workerName', $workerName); |
160
|
|
|
$where = 'andWhere'; |
161
|
1 |
|
} elseif (null !== $method) { |
162
|
|
|
$queryBuilder->where('j.method = :method')->setParameter(':method', $method); |
163
|
|
|
$where = 'andWhere'; |
164
|
|
|
} |
165
|
|
|
|
166
|
1 |
|
$dateTime = new \DateTime(); |
167
|
|
|
// Filter |
168
|
|
|
$queryBuilder |
169
|
1 |
|
->$where($queryBuilder->expr()->orX( |
170
|
1 |
|
$queryBuilder->expr()->isNull('j.whenAt'), |
171
|
1 |
|
$queryBuilder->expr()->lte('j.whenAt', ':whenAt') |
172
|
1 |
|
)) |
173
|
1 |
|
->andWhere($queryBuilder->expr()->orX( |
174
|
1 |
|
$queryBuilder->expr()->isNull('j.expiresAt'), |
175
|
1 |
|
$queryBuilder->expr()->gt('j.expiresAt', ':expiresAt') |
176
|
1 |
|
)) |
177
|
1 |
|
->andWhere('j.locked is NULL') |
178
|
1 |
|
->setParameter(':whenAt', $dateTime) |
179
|
1 |
|
->setParameter(':expiresAt', $dateTime); |
180
|
|
|
|
181
|
1 |
|
$query = $queryBuilder->getQuery(); |
182
|
|
|
|
183
|
1 |
|
return $query->getSingleScalarResult(); |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* Get Jobs statuses. |
188
|
|
|
*/ |
189
|
1 |
|
public function getStatus() |
190
|
|
|
{ |
191
|
1 |
|
$result = []; |
192
|
1 |
|
$this->getStatusByEntityName($this->getObjectName(), $result); |
193
|
1 |
|
$this->getStatusByEntityName($this->getArchiveObjectName(), $result); |
194
|
|
|
|
195
|
1 |
|
$finalResult = []; |
196
|
1 |
|
foreach ($result as $key => $item) { |
197
|
|
|
ksort($item); |
198
|
|
|
foreach ($item as $status => $count) { |
199
|
|
|
if (isset($finalResult[$key][$status])) { |
200
|
|
|
$finalResult[$key][$status] += $count; |
201
|
|
|
} else { |
202
|
|
|
$finalResult[$key][$status] = $count; |
203
|
|
|
} |
204
|
|
|
} |
205
|
1 |
|
} |
206
|
|
|
|
207
|
1 |
|
return $finalResult; |
208
|
|
|
} |
209
|
|
|
|
210
|
|
|
/** |
211
|
|
|
* @param string $entityName |
212
|
|
|
*/ |
213
|
1 |
|
protected function getStatusByEntityName($entityName, array &$result) |
214
|
|
|
{ |
215
|
|
|
/** @var EntityManager $objectManager */ |
216
|
1 |
|
$objectManager = $this->getObjectManager(); |
217
|
1 |
|
$result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c') |
218
|
1 |
|
->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult(); |
219
|
|
|
|
220
|
1 |
|
foreach ($result1 as $item) { |
221
|
|
|
$method = $item['workerName'].'->'.$item['method'].'()'; |
222
|
|
|
if (!isset($result[$method])) { |
223
|
|
|
$result[$method] = [BaseJob::STATUS_NEW => 0, |
224
|
|
|
BaseJob::STATUS_RUNNING => 0, |
225
|
|
|
RetryableJob::STATUS_EXPIRED => 0, |
226
|
|
|
RetryableJob::STATUS_MAX_ERROR => 0, |
227
|
|
|
RetryableJob::STATUS_MAX_STALLED => 0, |
228
|
|
|
RetryableJob::STATUS_MAX_RETRIES => 0, |
229
|
|
|
BaseJob::STATUS_SUCCESS => 0, |
230
|
|
|
BaseJob::STATUS_ERROR => 0, ]; |
231
|
|
|
} |
232
|
|
|
$result[$method][$item['status']] += intval($item['c']); |
233
|
1 |
|
} |
234
|
1 |
|
} |
235
|
|
|
|
236
|
|
|
/** |
237
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
238
|
|
|
* |
239
|
|
|
* @param string $workerName |
240
|
|
|
* @param string $methodName |
241
|
|
|
* @param bool $prioritize |
242
|
|
|
* |
243
|
|
|
* @return Job|null |
244
|
|
|
*/ |
245
|
|
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
246
|
|
|
{ |
247
|
|
|
$queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize); |
|
|
|
|
248
|
|
|
$queryBuilder->setMaxResults(1); |
249
|
|
|
|
250
|
|
|
/** @var QueryBuilder $queryBuilder */ |
251
|
|
|
$query = $queryBuilder->getQuery(); |
252
|
|
|
$jobs = $query->getResult(); |
253
|
|
|
|
254
|
|
|
return $this->takeJob($jobs, $runId); |
255
|
|
|
} |
256
|
|
|
|
257
|
|
|
/** |
258
|
|
|
* @param null $workerName |
259
|
|
|
* @param null $methodName |
260
|
|
|
* @param bool $prioritize |
261
|
|
|
* |
262
|
|
|
* @return QueryBuilder |
263
|
|
|
*/ |
264
|
|
|
public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true) |
265
|
|
|
{ |
266
|
|
|
/** @var EntityRepository $repository */ |
267
|
|
|
$repository = $this->getRepository(); |
268
|
|
|
$queryBuilder = $repository->createQueryBuilder('j'); |
269
|
|
|
$dateTime = new \DateTime(); |
270
|
|
|
$queryBuilder |
271
|
|
|
->select('j.id') |
272
|
|
|
->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW) |
273
|
|
|
->andWhere('j.locked is NULL') |
274
|
|
|
->andWhere($queryBuilder->expr()->orX( |
275
|
|
|
$queryBuilder->expr()->isNull('j.whenAt'), |
276
|
|
|
$queryBuilder->expr()->lte('j.whenAt', ':whenAt') |
277
|
|
|
)) |
278
|
|
|
->andWhere($queryBuilder->expr()->orX( |
279
|
|
|
$queryBuilder->expr()->isNull('j.expiresAt'), |
280
|
|
|
$queryBuilder->expr()->gt('j.expiresAt', ':expiresAt') |
281
|
|
|
)) |
282
|
|
|
->setParameter(':whenAt', $dateTime) |
283
|
|
|
->setParameter(':expiresAt', $dateTime); |
284
|
|
|
|
285
|
|
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
286
|
|
|
|
287
|
|
|
if ($prioritize) { |
288
|
|
|
$queryBuilder->addOrderBy('j.priority', 'DESC'); |
289
|
|
|
$queryBuilder->addOrderBy('j.whenAt', 'ASC'); |
290
|
|
|
} else { |
291
|
|
|
$queryBuilder->orderBy('j.whenAt', 'ASC'); |
292
|
|
|
} |
293
|
|
|
|
294
|
|
|
return $queryBuilder; |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
protected function takeJob($jobs, $runId = null) |
298
|
|
|
{ |
299
|
|
|
if (isset($jobs[0]['id'])) { |
300
|
|
|
/** @var EntityRepository $repository */ |
301
|
|
|
$repository = $this->getRepository(); |
302
|
|
|
/** @var QueryBuilder $queryBuilder */ |
303
|
|
|
$queryBuilder = $repository->createQueryBuilder('j'); |
304
|
|
|
$queryBuilder |
305
|
|
|
->update() |
306
|
|
|
->set('j.locked', ':locked') |
307
|
|
|
->setParameter(':locked', true) |
308
|
|
|
->set('j.lockedAt', ':lockedAt') |
309
|
|
|
->setParameter(':lockedAt', new \DateTime()) |
310
|
|
|
->set('j.status', ':status') |
311
|
|
|
->setParameter(':status', BaseJob::STATUS_RUNNING); |
312
|
|
|
if (null !== $runId) { |
313
|
|
|
$queryBuilder |
314
|
|
|
->set('j.runId', ':runId') |
315
|
|
|
->setParameter(':runId', $runId); |
316
|
|
|
} |
317
|
|
|
$queryBuilder->where('j.id = :id'); |
318
|
|
|
$queryBuilder->andWhere('j.locked is NULL'); |
319
|
|
|
$queryBuilder->setParameter(':id', $jobs[0]['id']); |
320
|
|
|
$resultCount = $queryBuilder->getQuery()->execute(); |
321
|
|
|
|
322
|
|
|
if (1 === $resultCount) { |
323
|
|
|
return $repository->find($jobs[0]['id']); |
324
|
|
|
} |
325
|
|
|
} |
326
|
|
|
|
327
|
|
|
return null; |
328
|
|
|
} |
329
|
|
|
|
330
|
|
|
/** |
331
|
|
|
* Tries to update the nearest job as a batch. |
332
|
|
|
* |
333
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
334
|
|
|
* |
335
|
|
|
* @return mixed|null |
336
|
|
|
*/ |
337
|
|
|
public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job) |
338
|
|
|
{ |
339
|
|
|
/** @var QueryBuilder $queryBuilder */ |
340
|
|
|
$queryBuilder = $this->getRepository()->createQueryBuilder('j'); |
341
|
|
|
$queryBuilder->select() |
342
|
|
|
->where('j.crcHash = :crcHash') |
343
|
|
|
->andWhere('j.status = :status') |
344
|
|
|
->setParameter(':status', BaseJob::STATUS_NEW) |
345
|
|
|
->setParameter(':crcHash', $job->getCrcHash()) |
346
|
|
|
->orderBy('j.whenAt', 'ASC') |
347
|
|
|
->setMaxResults(1); |
348
|
|
|
$existingJobs = $queryBuilder->getQuery()->execute(); |
349
|
|
|
|
350
|
|
|
if (empty($existingJobs)) { |
351
|
|
|
return null; |
352
|
|
|
} |
353
|
|
|
/** @var Job $existingJob */ |
354
|
|
|
$existingJob = $existingJobs[0]; |
355
|
|
|
|
356
|
|
|
$newPriority = max($job->getPriority(), $existingJob->getPriority()); |
357
|
|
|
$newWhenAt = min($job->getWhenAt(), $existingJob->getWhenAt()); |
358
|
|
|
|
359
|
|
|
$this->updateBatchJob($existingJob, $newPriority, $newWhenAt); |
360
|
|
|
|
361
|
|
|
return $existingJob; |
362
|
|
|
} |
363
|
|
|
|
364
|
|
|
protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenAt) |
365
|
|
|
{ |
366
|
|
|
$existingPriority = $existingJob->getPriority(); |
367
|
|
|
$existingWhenAt = $existingJob->getWhenAt(); |
368
|
|
|
|
369
|
|
|
if ($newPriority !== $existingPriority || $newWhenAt !== $existingWhenAt) { |
370
|
|
|
/** @var EntityRepository $repository */ |
371
|
|
|
$repository = $this->getRepository(); |
372
|
|
|
/** @var QueryBuilder $queryBuilder */ |
373
|
|
|
$queryBuilder = $repository->createQueryBuilder('j'); |
374
|
|
|
$queryBuilder->update(); |
375
|
|
|
if ($newPriority !== $existingPriority) { |
376
|
|
|
$existingJob->setPriority($newPriority); |
377
|
|
|
$queryBuilder->set('j.priority', ':priority') |
378
|
|
|
->setParameter(':priority', $newPriority); |
379
|
|
|
} |
380
|
|
|
if ($newWhenAt !== $existingWhenAt) { |
381
|
|
|
$existingJob->setWhenAt($newWhenAt); |
382
|
|
|
$queryBuilder->set('j.whenAt', ':whenAt') |
383
|
|
|
->setParameter(':whenAt', $newWhenAt); |
384
|
|
|
} |
385
|
|
|
$queryBuilder->where('j.id = :id'); |
386
|
|
|
$queryBuilder->andWhere('j.locked is NULL'); |
387
|
|
|
$queryBuilder->setParameter(':id', $existingJob->getId()); |
388
|
|
|
$queryBuilder->getQuery()->execute(); |
389
|
|
|
} |
390
|
|
|
|
391
|
|
|
return $existingJob; |
392
|
|
|
} |
393
|
|
|
} |
394
|
|
|
|
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.