1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ORM; |
4
|
|
|
|
5
|
|
|
use Doctrine\ORM\EntityManager; |
6
|
|
|
use Doctrine\ORM\EntityRepository; |
7
|
|
|
use Doctrine\ORM\QueryBuilder; |
8
|
|
|
use Dtc\QueueBundle\Doctrine\DoctrineJobManager; |
9
|
|
|
use Dtc\QueueBundle\Entity\Job; |
10
|
|
|
use Dtc\QueueBundle\Exception\UnsupportedException; |
11
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
12
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
13
|
|
|
use Dtc\QueueBundle\Model\StallableJob; |
14
|
|
|
use Dtc\QueueBundle\Util\Util; |
15
|
|
|
use Symfony\Component\Process\Exception\LogicException; |
16
|
|
|
|
17
|
|
|
class JobManager extends DoctrineJobManager |
18
|
|
|
{ |
19
|
|
|
use CommonTrait; |
20
|
|
|
protected static $saveInsertCalled = null; |
21
|
|
|
protected static $resetInsertCalled = null; |
22
|
|
|
|
23
|
3 |
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
24
|
|
|
{ |
25
|
|
|
/** @var EntityManager $objectManager */ |
26
|
3 |
|
$objectManager = $this->getObjectManager(); |
27
|
|
|
|
28
|
|
|
$queryBuilder = $objectManager |
29
|
3 |
|
->createQueryBuilder() |
30
|
3 |
|
->select('count(a.id)') |
31
|
3 |
|
->from($objectName, 'a') |
32
|
3 |
|
->where('a.status = :status'); |
33
|
|
|
|
34
|
3 |
|
if (null !== $workerName) { |
35
|
1 |
|
$queryBuilder->andWhere('a.workerName = :workerName') |
36
|
1 |
|
->setParameter(':workerName', $workerName); |
37
|
|
|
} |
38
|
|
|
|
39
|
3 |
|
if (null !== $method) { |
40
|
1 |
|
$queryBuilder->andWhere('a.method = :method') |
41
|
1 |
|
->setParameter(':method', $workerName); |
42
|
|
|
} |
43
|
|
|
|
44
|
3 |
|
$count = $queryBuilder->setParameter(':status', $status) |
45
|
3 |
|
->getQuery()->getSingleScalarResult(); |
46
|
|
|
|
47
|
3 |
|
if (!$count) { |
48
|
1 |
|
return 0; |
49
|
|
|
} |
50
|
|
|
|
51
|
3 |
|
return $count; |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* @param string|null $workerName |
56
|
|
|
* @param string|null $method |
57
|
|
|
* |
58
|
|
|
* @return int Count of jobs pruned |
59
|
|
|
*/ |
60
|
1 |
|
public function pruneExceptionJobs($workerName = null, $method = null) |
61
|
|
|
{ |
62
|
|
|
/** @var EntityManager $objectManager */ |
63
|
1 |
|
$objectManager = $this->getObjectManager(); |
64
|
1 |
|
$queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j'); |
65
|
1 |
|
$queryBuilder->where('j.status = :status') |
66
|
1 |
|
->setParameter(':status', BaseJob::STATUS_EXCEPTION); |
67
|
|
|
|
68
|
1 |
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
69
|
1 |
|
$query = $queryBuilder->getQuery(); |
70
|
|
|
|
71
|
1 |
|
return intval($query->execute()); |
72
|
|
|
} |
73
|
|
|
|
74
|
21 |
|
protected function resetSaveOk($function) |
75
|
|
|
{ |
76
|
21 |
|
$objectManager = $this->getObjectManager(); |
77
|
21 |
|
$splObjectHash = spl_object_hash($objectManager); |
78
|
|
|
|
79
|
21 |
|
if ('save' === $function) { |
80
|
|
|
$compare = static::$resetInsertCalled; |
81
|
|
|
} else { |
82
|
21 |
|
$compare = static::$saveInsertCalled; |
83
|
|
|
} |
84
|
|
|
|
85
|
21 |
|
if ($splObjectHash === $compare) { |
86
|
|
|
// Insert SQL is cached... |
87
|
|
|
$msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)"; |
88
|
|
|
throw new LogicException($msg); |
89
|
|
|
} |
90
|
|
|
|
91
|
21 |
|
if ('save' === $function) { |
92
|
|
|
static::$saveInsertCalled = spl_object_hash($objectManager); |
93
|
|
|
} else { |
94
|
21 |
|
static::$resetInsertCalled = spl_object_hash($objectManager); |
95
|
|
|
} |
96
|
21 |
|
} |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* @param string $workerName |
100
|
|
|
* @param string $method |
101
|
|
|
*/ |
102
|
13 |
|
protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null) |
103
|
|
|
{ |
104
|
13 |
|
if (null !== $workerName) { |
105
|
5 |
|
$queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName); |
106
|
|
|
} |
107
|
|
|
|
108
|
13 |
|
if (null !== $method) { |
109
|
4 |
|
$queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method); |
110
|
|
|
} |
111
|
13 |
|
} |
112
|
|
|
|
113
|
1 |
|
protected function updateExpired($workerName = null, $method = null) |
114
|
|
|
{ |
115
|
|
|
/** @var EntityManager $objectManager */ |
116
|
1 |
|
$objectManager = $this->getObjectManager(); |
117
|
1 |
|
$queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j'); |
118
|
1 |
|
$queryBuilder->set('j.status', ':newStatus'); |
119
|
1 |
|
$queryBuilder->where('j.expiresAt <= :expiresAt') |
120
|
1 |
|
->setParameter(':expiresAt', Util::getMicrotimeDateTime()); |
121
|
1 |
|
$queryBuilder->andWhere('j.status = :status') |
122
|
1 |
|
->setParameter(':status', BaseJob::STATUS_NEW) |
123
|
1 |
|
->setParameter(':newStatus', Job::STATUS_EXPIRED); |
124
|
|
|
|
125
|
1 |
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
126
|
1 |
|
$query = $queryBuilder->getQuery(); |
127
|
|
|
|
128
|
1 |
|
return intval($query->execute()); |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* Removes archived jobs older than $olderThan. |
133
|
|
|
* |
134
|
|
|
* @param \DateTime $olderThan |
135
|
|
|
*/ |
136
|
1 |
|
public function pruneArchivedJobs(\DateTime $olderThan) |
137
|
|
|
{ |
138
|
1 |
|
return $this->removeOlderThan( |
139
|
1 |
|
$this->getJobArchiveClass(), |
140
|
1 |
|
'updatedAt', |
141
|
1 |
|
$olderThan |
142
|
|
|
); |
143
|
|
|
} |
144
|
|
|
|
145
|
2 |
|
public function getJobCount($workerName = null, $method = null) |
146
|
|
|
{ |
147
|
|
|
/** @var EntityManager $objectManager */ |
148
|
2 |
|
$objectManager = $this->getObjectManager(); |
149
|
2 |
|
$queryBuilder = $objectManager->createQueryBuilder(); |
150
|
|
|
|
151
|
2 |
|
$queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j'); |
152
|
|
|
|
153
|
2 |
|
$where = 'where'; |
154
|
2 |
|
if (null !== $workerName) { |
155
|
|
|
if (null !== $method) { |
156
|
|
|
$queryBuilder->where($queryBuilder->expr()->andX( |
157
|
|
|
$queryBuilder->expr()->eq('j.workerName', ':workerName'), |
158
|
|
|
$queryBuilder->expr()->eq('j.method', ':method') |
159
|
|
|
)) |
160
|
|
|
->setParameter(':method', $method); |
161
|
|
|
} else { |
162
|
|
|
$queryBuilder->where('j.workerName = :workerName'); |
163
|
|
|
} |
164
|
|
|
$queryBuilder->setParameter(':workerName', $workerName); |
165
|
|
|
$where = 'andWhere'; |
166
|
2 |
|
} elseif (null !== $method) { |
167
|
|
|
$queryBuilder->where('j.method = :method')->setParameter(':method', $method); |
168
|
|
|
$where = 'andWhere'; |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
// Filter |
172
|
|
|
$queryBuilder |
173
|
2 |
|
->$where($queryBuilder->expr()->orX( |
174
|
2 |
|
$queryBuilder->expr()->isNull('j.whenUs'), |
175
|
2 |
|
$queryBuilder->expr()->lte('j.whenUs', ':whenUs') |
176
|
|
|
)) |
177
|
2 |
|
->andWhere($queryBuilder->expr()->orX( |
178
|
2 |
|
$queryBuilder->expr()->isNull('j.expiresAt'), |
179
|
2 |
|
$queryBuilder->expr()->gt('j.expiresAt', ':expiresAt') |
180
|
|
|
)) |
181
|
2 |
|
->setParameter(':whenUs', Util::getMicrotimeDecimal()) |
182
|
2 |
|
->setParameter(':expiresAt', Util::getMicrotimeDateTime()); |
183
|
|
|
|
184
|
2 |
|
$query = $queryBuilder->getQuery(); |
185
|
|
|
|
186
|
2 |
|
return $query->getSingleScalarResult(); |
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
/** |
190
|
|
|
* Get Jobs statuses. |
191
|
|
|
*/ |
192
|
3 |
|
public function getStatus() |
193
|
|
|
{ |
194
|
3 |
|
$result = []; |
195
|
3 |
|
$this->getStatusByEntityName($this->getJobClass(), $result); |
196
|
3 |
|
$this->getStatusByEntityName($this->getJobArchiveClass(), $result); |
197
|
|
|
|
198
|
3 |
|
$finalResult = []; |
199
|
3 |
|
foreach ($result as $key => $item) { |
200
|
1 |
|
ksort($item); |
201
|
1 |
|
foreach ($item as $status => $count) { |
202
|
1 |
|
if (isset($finalResult[$key][$status])) { |
203
|
|
|
$finalResult[$key][$status] += $count; |
204
|
|
|
} else { |
205
|
1 |
|
$finalResult[$key][$status] = $count; |
206
|
|
|
} |
207
|
|
|
} |
208
|
|
|
} |
209
|
|
|
|
210
|
3 |
|
return $finalResult; |
211
|
|
|
} |
212
|
|
|
|
213
|
|
|
/** |
214
|
|
|
* @param string $entityName |
215
|
|
|
*/ |
216
|
3 |
|
protected function getStatusByEntityName($entityName, array &$result) |
217
|
|
|
{ |
218
|
|
|
/** @var EntityManager $objectManager */ |
219
|
3 |
|
$objectManager = $this->getObjectManager(); |
220
|
3 |
|
$result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c') |
221
|
3 |
|
->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult(); |
222
|
|
|
|
223
|
3 |
|
foreach ($result1 as $item) { |
224
|
1 |
|
$method = $item['workerName'].'->'.$item['method'].'()'; |
225
|
1 |
|
if (!isset($result[$method])) { |
226
|
1 |
|
$result[$method] = [BaseJob::STATUS_NEW => 0, |
227
|
|
|
BaseJob::STATUS_RUNNING => 0, |
228
|
|
|
BaseJob::STATUS_SUCCESS => 0, |
229
|
|
|
BaseJob::STATUS_FAILURE => 0, |
230
|
|
|
BaseJob::STATUS_EXCEPTION => 0, |
231
|
|
|
StallableJob::STATUS_STALLED => 0, |
232
|
|
|
\Dtc\QueueBundle\Model\Job::STATUS_EXPIRED => 0, |
233
|
|
|
RetryableJob::STATUS_MAX_FAILURES => 0, |
234
|
|
|
RetryableJob::STATUS_MAX_EXCEPTIONS => 0, |
235
|
|
|
StallableJob::STATUS_MAX_STALLS => 0, |
236
|
|
|
RetryableJob::STATUS_MAX_RETRIES => 0, ]; |
237
|
|
|
} |
238
|
1 |
|
$result[$method][$item['status']] += intval($item['c']); |
239
|
|
|
} |
240
|
3 |
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
244
|
|
|
* |
245
|
|
|
* @param string $workerName |
246
|
|
|
* @param string $methodName |
247
|
|
|
* @param bool $prioritize |
248
|
|
|
* @param int $runId |
249
|
|
|
* |
250
|
|
|
* @return Job|null |
251
|
|
|
*/ |
252
|
11 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
253
|
|
|
{ |
254
|
|
|
do { |
255
|
11 |
|
$queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize); |
256
|
11 |
|
$queryBuilder->select('j.id'); |
257
|
11 |
|
$queryBuilder->setMaxResults(100); |
258
|
|
|
|
259
|
|
|
/** @var QueryBuilder $queryBuilder */ |
260
|
11 |
|
$query = $queryBuilder->getQuery(); |
261
|
11 |
|
$jobs = $query->getResult(); |
262
|
11 |
|
if ($jobs) { |
|
|
|
|
263
|
9 |
|
foreach ($jobs as $job) { |
264
|
9 |
|
if ($job = $this->takeJob($job['id'], $runId)) { |
|
|
|
|
265
|
9 |
|
return $job; |
266
|
|
|
} |
267
|
|
|
} |
268
|
|
|
} |
269
|
6 |
|
} while ($jobs); |
|
|
|
|
270
|
|
|
|
271
|
6 |
|
return null; |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
/** |
275
|
|
|
* @param string|null $workerName |
276
|
|
|
* @param string|null $methodName |
277
|
|
|
* @param bool $prioritize |
278
|
|
|
* |
279
|
|
|
* @return QueryBuilder |
280
|
|
|
*/ |
281
|
11 |
|
public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true) |
282
|
|
|
{ |
283
|
|
|
/** @var EntityRepository $repository */ |
284
|
11 |
|
$repository = $this->getRepository(); |
285
|
11 |
|
$queryBuilder = $repository->createQueryBuilder('j'); |
286
|
11 |
|
$this->addStandardPredicate($queryBuilder); |
287
|
11 |
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
288
|
|
|
|
289
|
11 |
|
if ($prioritize) { |
290
|
11 |
|
$queryBuilder->addOrderBy('j.priority', 'DESC'); |
291
|
11 |
|
$queryBuilder->addOrderBy('j.whenUs', 'ASC'); |
292
|
|
|
} else { |
293
|
1 |
|
$queryBuilder->orderBy('j.whenUs', 'ASC'); |
294
|
|
|
} |
295
|
|
|
|
296
|
11 |
|
return $queryBuilder; |
297
|
|
|
} |
298
|
|
|
|
299
|
12 |
|
protected function addStandardPredicate(QueryBuilder $queryBuilder) |
300
|
|
|
{ |
301
|
12 |
|
$dateTime = Util::getMicrotimeDateTime(); |
302
|
12 |
|
$decimal = Util::getMicrotimeDecimalFormat($dateTime); |
303
|
|
|
|
304
|
|
|
$queryBuilder |
305
|
12 |
|
->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW) |
306
|
12 |
|
->andWhere($queryBuilder->expr()->orX( |
307
|
12 |
|
$queryBuilder->expr()->isNull('j.whenUs'), |
308
|
12 |
|
$queryBuilder->expr()->lte('j.whenUs', ':whenUs') |
309
|
|
|
)) |
310
|
12 |
|
->andWhere($queryBuilder->expr()->orX( |
311
|
12 |
|
$queryBuilder->expr()->isNull('j.expiresAt'), |
312
|
12 |
|
$queryBuilder->expr()->gt('j.expiresAt', ':expiresAt') |
313
|
|
|
)) |
314
|
12 |
|
->setParameter(':whenUs', $decimal) |
315
|
12 |
|
->setParameter(':expiresAt', $dateTime); |
316
|
12 |
|
} |
317
|
|
|
|
318
|
|
|
/** |
319
|
|
|
* @param int $runId |
320
|
|
|
*/ |
321
|
9 |
|
protected function takeJob($jobId, $runId = null) |
322
|
|
|
{ |
323
|
|
|
/** @var EntityRepository $repository */ |
324
|
9 |
|
$repository = $this->getRepository(); |
325
|
|
|
/** @var QueryBuilder $queryBuilder */ |
326
|
9 |
|
$queryBuilder = $repository->createQueryBuilder('j'); |
327
|
|
|
$queryBuilder |
328
|
9 |
|
->update() |
329
|
9 |
|
->set('j.status', ':status') |
330
|
9 |
|
->setParameter(':status', BaseJob::STATUS_RUNNING); |
331
|
9 |
|
if (null !== $runId) { |
332
|
|
|
$queryBuilder |
333
|
1 |
|
->set('j.runId', ':runId') |
334
|
1 |
|
->setParameter(':runId', $runId); |
335
|
|
|
} |
336
|
9 |
|
$queryBuilder->set('j.startedAt', ':startedAt') |
337
|
9 |
|
->setParameter(':startedAt', Util::getMicrotimeDateTime()); |
338
|
9 |
|
$queryBuilder->where('j.id = :id'); |
339
|
9 |
|
$queryBuilder->setParameter(':id', $jobId); |
340
|
9 |
|
$resultCount = $queryBuilder->getQuery()->execute(); |
341
|
|
|
|
342
|
9 |
|
if (1 === $resultCount) { |
343
|
9 |
|
return $repository->find($jobId); |
344
|
|
|
} |
345
|
|
|
|
346
|
|
|
return null; |
347
|
|
|
} |
348
|
|
|
|
349
|
|
|
/** |
350
|
|
|
* Tries to update the nearest job as a batch. |
351
|
|
|
* |
352
|
|
|
* @param \Dtc\QueueBundle\Model\Job $job |
353
|
|
|
* |
354
|
|
|
* @return null|Job |
355
|
|
|
*/ |
356
|
1 |
|
public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job) |
357
|
|
|
{ |
358
|
1 |
|
if (!$job instanceof Job) { |
359
|
|
|
throw new UnsupportedException('$job must be instance of '.Job::class); |
360
|
|
|
} |
361
|
|
|
|
362
|
|
|
/** @var QueryBuilder $queryBuilder */ |
363
|
1 |
|
$queryBuilder = $this->getRepository()->createQueryBuilder('j'); |
364
|
1 |
|
$queryBuilder->select() |
365
|
1 |
|
->where('j.crcHash = :crcHash') |
366
|
1 |
|
->andWhere('j.status = :status') |
367
|
1 |
|
->setParameter(':status', BaseJob::STATUS_NEW) |
368
|
1 |
|
->setParameter(':crcHash', $job->getCrcHash()) |
369
|
1 |
|
->orderBy('j.whenUs', 'ASC') |
370
|
1 |
|
->setMaxResults(1); |
371
|
1 |
|
$existingJobs = $queryBuilder->getQuery()->execute(); |
372
|
|
|
|
373
|
1 |
|
if (empty($existingJobs)) { |
374
|
|
|
return null; |
375
|
|
|
} |
376
|
|
|
|
377
|
|
|
/** @var Job $existingJob */ |
378
|
1 |
|
$existingJob = $existingJobs[0]; |
379
|
|
|
|
380
|
1 |
|
$newPriority = max($job->getPriority(), $existingJob->getPriority()); |
381
|
1 |
|
$newWhenUs = $existingJob->getWhenUs(); |
382
|
1 |
|
$bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs()); |
383
|
1 |
|
if ($bcResult < 0) { |
384
|
1 |
|
$newWhenUs = $job->getWhenUs(); |
385
|
|
|
} |
386
|
|
|
|
387
|
1 |
|
$this->updateBatchJob($existingJob, $newPriority, $newWhenUs); |
388
|
|
|
|
389
|
1 |
|
return $existingJob; |
390
|
|
|
} |
391
|
|
|
|
392
|
|
|
/** |
393
|
|
|
* @param int $newPriority |
394
|
|
|
* @param string $newWhenUs |
395
|
|
|
*/ |
396
|
1 |
|
protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs) |
397
|
|
|
{ |
398
|
1 |
|
$existingPriority = $existingJob->getPriority(); |
399
|
1 |
|
$existingWhenUs = $existingJob->getWhenUs(); |
400
|
|
|
|
401
|
1 |
|
if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) { |
402
|
|
|
/** @var EntityRepository $repository */ |
403
|
1 |
|
$repository = $this->getRepository(); |
404
|
|
|
/** @var QueryBuilder $queryBuilder */ |
405
|
1 |
|
$queryBuilder = $repository->createQueryBuilder('j'); |
406
|
1 |
|
$queryBuilder->update(); |
407
|
1 |
|
if ($newPriority !== $existingPriority) { |
408
|
1 |
|
$existingJob->setPriority($newPriority); |
409
|
1 |
|
$queryBuilder->set('j.priority', ':priority') |
410
|
1 |
|
->setParameter(':priority', $newPriority); |
411
|
|
|
} |
412
|
1 |
|
if ($newWhenUs !== $existingWhenUs) { |
413
|
1 |
|
$existingJob->setWhenUs($newWhenUs); |
414
|
1 |
|
$queryBuilder->set('j.whenUs', ':whenUs') |
415
|
1 |
|
->setParameter(':whenUs', $newWhenUs); |
416
|
|
|
} |
417
|
1 |
|
$queryBuilder->where('j.id = :id'); |
418
|
1 |
|
$queryBuilder->setParameter(':id', $existingJob->getId()); |
419
|
1 |
|
$queryBuilder->getQuery()->execute(); |
420
|
|
|
} |
421
|
|
|
|
422
|
1 |
|
return $existingJob; |
423
|
|
|
} |
424
|
|
|
|
425
|
1 |
|
public function getWorkersAndMethods() |
426
|
|
|
{ |
427
|
|
|
/** @var EntityRepository $repository */ |
428
|
1 |
|
$repository = $this->getRepository(); |
429
|
1 |
|
$queryBuilder = $repository->createQueryBuilder('j'); |
430
|
1 |
|
$this->addStandardPredicate($queryBuilder); |
431
|
|
|
$queryBuilder |
432
|
1 |
|
->select('DISTINCT j.workerName, j.method'); |
433
|
|
|
|
434
|
1 |
|
$results = $queryBuilder->getQuery()->getArrayResult(); |
435
|
1 |
|
if (empty($results)) { |
436
|
1 |
|
return []; |
437
|
|
|
} |
438
|
|
|
$workerMethods = []; |
439
|
|
|
foreach ($results as $result) { |
440
|
|
|
$workerMethods[$result['workerName']][] = $result['method']; |
441
|
|
|
} |
442
|
|
|
|
443
|
|
|
return $workerMethods; |
444
|
|
|
} |
445
|
|
|
|
446
|
|
|
/** |
447
|
|
|
* @param string $workerName |
448
|
|
|
* @param string $methodName |
449
|
|
|
*/ |
450
|
2 |
|
public function countLiveJobs($workerName = null, $methodName = null) |
451
|
|
|
{ |
452
|
|
|
/** @var EntityRepository $repository */ |
453
|
2 |
|
$repository = $this->getRepository(); |
454
|
2 |
|
$queryBuilder = $repository->createQueryBuilder('j'); |
455
|
2 |
|
$this->addStandardPredicate($queryBuilder); |
456
|
2 |
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
457
|
2 |
|
$queryBuilder->select('count(j.id)'); |
458
|
|
|
|
459
|
2 |
|
return $queryBuilder->getQuery()->getSingleScalarResult(); |
460
|
|
|
} |
461
|
|
|
|
462
|
|
|
/** |
463
|
|
|
* @param string $workerName |
464
|
|
|
* @param string $methodName |
465
|
|
|
* @param \Closure $progressCallback |
466
|
|
|
*/ |
467
|
1 |
|
public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback) |
468
|
|
|
{ |
469
|
|
|
// First mark all Live non-running jobs as Archive |
470
|
1 |
|
$repository = $this->getRepository(); |
471
|
|
|
/** @var QueryBuilder $queryBuilder */ |
472
|
1 |
|
$queryBuilder = $repository->createQueryBuilder('j'); |
473
|
1 |
|
$queryBuilder->update($this->getJobClass(), 'j') |
474
|
1 |
|
->set('j.status', ':statusArchive') |
475
|
1 |
|
->setParameter(':statusArchive', Job::STATUS_ARCHIVE); |
476
|
1 |
|
$this->addStandardPredicate($queryBuilder); |
477
|
1 |
|
$this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
478
|
1 |
|
$resultCount = $queryBuilder->getQuery()->execute(); |
479
|
|
|
|
480
|
1 |
|
if ($resultCount) { |
481
|
1 |
|
$this->runArchive($workerName, $methodName, $progressCallback); |
482
|
|
|
} |
483
|
1 |
|
} |
484
|
|
|
|
485
|
|
|
/** |
486
|
|
|
* Move jobs in 'archive' status to the archive table. |
487
|
|
|
* |
488
|
|
|
* This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT |
489
|
|
|
* All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine. |
490
|
|
|
* |
491
|
|
|
* @param string|null $workerName |
492
|
|
|
* @param string|null $methodName |
493
|
|
|
* @param \Closure $progressCallback |
494
|
|
|
*/ |
495
|
1 |
|
protected function runArchive($workerName = null, $methodName = null, $progressCallback) |
|
|
|
|
496
|
|
|
{ |
497
|
|
|
/** @var EntityManager $entityManager */ |
498
|
1 |
|
$entityManager = $this->getObjectManager(); |
499
|
1 |
|
$count = 0; |
500
|
|
|
do { |
501
|
|
|
/** @var EntityRepository $repository */ |
502
|
1 |
|
$repository = $this->getRepository(); |
503
|
1 |
|
$queryBuilder = $repository->createQueryBuilder('j'); |
504
|
1 |
|
$queryBuilder->where('j.status = :status') |
505
|
1 |
|
->setParameter(':status', Job::STATUS_ARCHIVE) |
506
|
1 |
|
->setMaxResults(10000); |
507
|
|
|
|
508
|
1 |
|
$results = $queryBuilder->getQuery()->getArrayResult(); |
509
|
1 |
|
foreach ($results as $jobRow) { |
510
|
1 |
|
$job = $repository->find($jobRow['id']); |
511
|
1 |
|
if ($job) { |
512
|
1 |
|
$entityManager->remove($job); |
513
|
|
|
} |
514
|
1 |
|
++$count; |
515
|
1 |
|
if (0 == $count % 10) { |
516
|
|
|
$this->flush(); |
517
|
1 |
|
$progressCallback($count); |
518
|
|
|
} |
519
|
|
|
} |
520
|
1 |
|
$this->flush(); |
521
|
1 |
|
$progressCallback($count); |
522
|
1 |
|
} while (!empty($results) && 10000 == count($results)); |
523
|
1 |
|
} |
524
|
|
|
} |
525
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.