1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Doctrine; |
4
|
|
|
|
5
|
|
|
use Doctrine\ODM\MongoDB\DocumentRepository; |
6
|
|
|
use Doctrine\ORM\EntityRepository; |
7
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
8
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
9
|
|
|
use Dtc\QueueBundle\Model\Job; |
10
|
|
|
use Dtc\QueueBundle\Model\JobTiming; |
11
|
|
|
use Dtc\QueueBundle\Model\StallableJob; |
12
|
|
|
use Dtc\QueueBundle\Model\Run; |
13
|
|
|
use Dtc\QueueBundle\Util\Util; |
14
|
|
|
|
15
|
|
|
abstract class DoctrineJobManager extends BaseDoctrineJobManager |
16
|
|
|
{ |
17
|
|
|
/** Number of jobs to prune / reset / gather at a time */ |
18
|
|
|
const FETCH_COUNT = 100; |
19
|
|
|
|
20
|
|
|
/** Number of seconds before a job is considered stalled if the runner is no longer active */ |
21
|
|
|
const STALLED_SECONDS = 1800; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @param string $objectName |
25
|
|
|
*/ |
26
|
|
|
abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null); |
27
|
|
|
|
28
|
2 |
|
public function resetExceptionJobs($workerName = null, $method = null) |
29
|
|
|
{ |
30
|
2 |
|
$count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method); |
31
|
|
|
|
32
|
2 |
|
$criterion = ['status' => Job::STATUS_EXCEPTION]; |
33
|
2 |
|
$this->addWorkerNameMethod($criterion, $workerName, $method); |
34
|
|
|
|
35
|
2 |
|
$countProcessed = 0; |
36
|
2 |
|
for ($i = 0; $i < $count; $i += static::FETCH_COUNT) { |
37
|
2 |
|
$countProcessed += $this->resetJobsByCriterion( |
38
|
2 |
|
$criterion, |
39
|
2 |
|
static::FETCH_COUNT, |
40
|
2 |
|
$i |
41
|
|
|
); |
42
|
|
|
} |
43
|
|
|
|
44
|
2 |
|
return $countProcessed; |
45
|
|
|
} |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* Sets the status to Job::STATUS_EXPIRED for those jobs that are expired. |
49
|
|
|
* |
50
|
|
|
* @param null $workerName |
51
|
|
|
* @param null $method |
52
|
|
|
* |
53
|
|
|
* @return mixed |
54
|
|
|
*/ |
55
|
|
|
abstract protected function updateExpired($workerName = null, $method = null); |
56
|
|
|
|
57
|
9 |
|
protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null) |
58
|
|
|
{ |
59
|
9 |
|
if (null !== $workerName) { |
60
|
4 |
|
$criterion['workerName'] = $workerName; |
61
|
|
|
} |
62
|
9 |
|
if (null !== $method) { |
63
|
4 |
|
$criterion['method'] = $method; |
64
|
|
|
} |
65
|
9 |
|
} |
66
|
|
|
|
67
|
3 |
|
public function pruneExpiredJobs($workerName = null, $method = null) |
68
|
|
|
{ |
69
|
3 |
|
$count = $this->updateExpired($workerName, $method); |
70
|
3 |
|
$criterion = ['status' => Job::STATUS_EXPIRED]; |
71
|
3 |
|
$this->addWorkerNameMethod($criterion, $workerName, $method); |
72
|
3 |
|
$objectManager = $this->getObjectManager(); |
73
|
3 |
|
$repository = $this->getRepository(); |
74
|
3 |
|
$finalCount = 0; |
75
|
3 |
|
for ($i = 0; $i < $count; $i += static::FETCH_COUNT) { |
76
|
3 |
|
$expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i); |
77
|
3 |
|
$innerCount = 0; |
78
|
3 |
|
if (!empty($expiredJobs)) { |
79
|
3 |
|
foreach ($expiredJobs as $expiredJob) { |
80
|
|
|
/* @var Job $expiredJob */ |
81
|
3 |
|
$expiredJob->setStatus(Job::STATUS_EXPIRED); |
82
|
3 |
|
$objectManager->remove($expiredJob); |
83
|
3 |
|
++$finalCount; |
84
|
3 |
|
++$innerCount; |
85
|
|
|
} |
86
|
|
|
} |
87
|
3 |
|
$this->flush(); |
88
|
3 |
|
for ($j = 0; $j < $innerCount; ++$j) { |
89
|
3 |
|
$this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED); |
90
|
|
|
} |
91
|
|
|
} |
92
|
|
|
|
93
|
3 |
|
return $finalCount; |
94
|
|
|
} |
95
|
|
|
|
96
|
4 |
|
protected function getStalledJobs($workerName = null, $method = null) |
97
|
|
|
{ |
98
|
4 |
|
$count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method); |
99
|
|
|
|
100
|
4 |
|
$criterion = ['status' => BaseJob::STATUS_RUNNING]; |
101
|
4 |
|
$this->addWorkerNameMethod($criterion, $workerName, $method); |
102
|
|
|
|
103
|
4 |
|
$runningJobs = $this->findRunningJobs($criterion, $count); |
104
|
|
|
|
105
|
4 |
|
return $this->extractStalledJobs($runningJobs); |
106
|
|
|
} |
107
|
|
|
|
108
|
4 |
|
protected function findRunningJobs($criterion, $count) |
109
|
|
|
{ |
110
|
4 |
|
$repository = $this->getRepository(); |
111
|
4 |
|
$runningJobsById = []; |
112
|
|
|
|
113
|
4 |
|
for ($i = 0; $i < $count; $i += static::FETCH_COUNT) { |
114
|
4 |
|
$runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i); |
115
|
4 |
|
if (!empty($runningJobs)) { |
116
|
4 |
|
foreach ($runningJobs as $job) { |
117
|
|
|
/** @var StallableJob $job */ |
118
|
4 |
|
if (null !== $runId = $job->getRunId()) { |
119
|
4 |
|
$runningJobsById[$runId][] = $job; |
120
|
|
|
} |
121
|
|
|
} |
122
|
|
|
} |
123
|
|
|
} |
124
|
|
|
|
125
|
4 |
|
return $runningJobsById; |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
/** |
129
|
|
|
* @param $runId |
130
|
|
|
* @param array $jobs |
131
|
|
|
* @param array $stalledJobs |
132
|
|
|
*/ |
133
|
4 |
|
protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs) |
134
|
|
|
{ |
135
|
4 |
|
$objectManager = $this->getObjectManager(); |
136
|
4 |
|
$runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass()); |
137
|
4 |
|
if ($run = $runRepository->find($runId)) { |
138
|
2 |
|
foreach ($jobs as $job) { |
139
|
2 |
|
if ($run->getCurrentJobId() == $job->getId()) { |
140
|
2 |
|
continue; |
141
|
|
|
} |
142
|
2 |
|
$stalledJobs[] = $job; |
143
|
|
|
} |
144
|
|
|
} |
145
|
4 |
|
} |
146
|
|
|
|
147
|
|
|
/** |
148
|
|
|
* @param array $runningJobsById |
149
|
|
|
* |
150
|
|
|
* @return array |
151
|
|
|
*/ |
152
|
4 |
|
protected function extractStalledJobs(array $runningJobsById) |
153
|
|
|
{ |
154
|
4 |
|
$stalledJobs = []; |
155
|
4 |
|
foreach (array_keys($runningJobsById) as $runId) { |
156
|
4 |
|
$this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs); |
157
|
4 |
|
$this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId); |
158
|
|
|
} |
159
|
|
|
|
160
|
4 |
|
return $stalledJobs; |
161
|
|
|
} |
162
|
|
|
|
163
|
4 |
|
protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId) |
164
|
|
|
{ |
165
|
4 |
|
$runManager = $this->getRunManager(); |
166
|
4 |
|
if (!method_exists($runManager, 'getObjectManager')) { |
167
|
|
|
return; |
168
|
|
|
} |
169
|
4 |
|
if (!method_exists($runManager, 'getRunArchiveClass')) { |
170
|
|
|
return; |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
/** @var EntityRepository|DocumentRepository $runArchiveRepository */ |
174
|
4 |
|
$runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass()); |
175
|
|
|
/** @var Run $run */ |
176
|
4 |
|
if ($run = $runArchiveRepository->find($runId)) { |
177
|
4 |
|
if ($endTime = $run->getEndedAt()) { |
178
|
|
|
// Did it end over an hour ago |
179
|
4 |
|
if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) { |
180
|
4 |
|
$stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]); |
181
|
|
|
} |
182
|
|
|
} |
183
|
|
|
} |
184
|
4 |
|
} |
185
|
|
|
|
186
|
2 |
|
protected function runStalledLoop($i, $count, array $stalledJobs) |
187
|
|
|
{ |
188
|
2 |
|
$resetCount = 0; |
189
|
2 |
|
for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) { |
190
|
|
|
/* StallableJob $job */ |
191
|
2 |
|
$job = $stalledJobs[$j]; |
192
|
2 |
|
$job->setStatus(StallableJob::STATUS_STALLED); |
193
|
2 |
|
if ($this->saveHistory($job)) { |
194
|
2 |
|
++$resetCount; |
195
|
|
|
} |
196
|
|
|
} |
197
|
|
|
|
198
|
2 |
|
return $resetCount; |
199
|
|
|
} |
200
|
|
|
|
201
|
2 |
|
public function resetStalledJobs($workerName = null, $method = null) |
202
|
|
|
{ |
203
|
2 |
|
$stalledJobs = $this->getStalledJobs($workerName, $method); |
204
|
|
|
|
205
|
2 |
|
$countReset = 0; |
206
|
2 |
|
for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) { |
207
|
2 |
|
$resetCount = $this->runStalledLoop($i, $count, $stalledJobs); |
208
|
2 |
|
for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) { |
209
|
2 |
|
$this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED); |
210
|
|
|
} |
211
|
2 |
|
$countReset += $resetCount; |
212
|
2 |
|
$this->flush(); |
213
|
|
|
} |
214
|
|
|
|
215
|
2 |
|
return $countReset; |
216
|
|
|
} |
217
|
|
|
|
218
|
|
|
/** |
219
|
|
|
* @param string $workerName |
220
|
|
|
* @param string $method |
221
|
|
|
*/ |
222
|
2 |
|
public function pruneStalledJobs($workerName = null, $method = null) |
223
|
|
|
{ |
224
|
2 |
|
$stalledJobs = $this->getStalledJobs($workerName, $method); |
225
|
|
|
|
226
|
2 |
|
$countProcessed = 0; |
227
|
2 |
|
for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) { |
228
|
2 |
|
for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) { |
229
|
|
|
/** @var StallableJob $job */ |
230
|
2 |
|
$job = $stalledJobs[$j]; |
231
|
2 |
|
$job->setStatus(StallableJob::STATUS_STALLED); |
232
|
2 |
|
$job->setStalls(intval($job->getStalls()) + 1); |
233
|
2 |
|
$this->deleteJob($job); |
234
|
2 |
|
++$countProcessed; |
235
|
|
|
} |
236
|
2 |
|
$this->flush(); |
237
|
|
|
} |
238
|
|
|
|
239
|
2 |
|
return $countProcessed; |
240
|
|
|
} |
241
|
|
|
|
242
|
8 |
|
protected function stallableSaveHistory(StallableJob $job, $retry) |
243
|
|
|
{ |
244
|
8 |
|
if (!$retry) { |
245
|
8 |
|
$this->deleteJob($job); |
246
|
|
|
} |
247
|
|
|
|
248
|
8 |
|
return $retry; |
249
|
|
|
} |
250
|
|
|
|
251
|
43 |
|
protected function stallableSave(StallableJob $job) |
252
|
|
|
{ |
253
|
|
|
// Generate crc hash for the job |
254
|
43 |
|
$hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs()); |
255
|
43 |
|
$crcHash = hash('sha256', serialize($hashValues)); |
256
|
43 |
|
$job->setCrcHash($crcHash); |
257
|
43 |
|
$objectManager = $this->getObjectManager(); |
258
|
|
|
|
259
|
43 |
|
if (true === $job->getBatch()) { |
260
|
2 |
|
$oldJob = $this->updateNearestBatch($job); |
261
|
2 |
|
if ($oldJob) { |
262
|
2 |
|
return $oldJob; |
263
|
|
|
} |
264
|
|
|
} |
265
|
|
|
|
266
|
|
|
// Just save a new job |
267
|
43 |
|
$this->resetSaveOk(__FUNCTION__); |
268
|
43 |
|
$objectManager->persist($job); |
269
|
43 |
|
$objectManager->flush(); |
270
|
|
|
|
271
|
43 |
|
return $job; |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
abstract protected function updateNearestBatch(Job $job); |
275
|
|
|
|
276
|
|
|
/** |
277
|
|
|
* @param string $objectName |
278
|
|
|
*/ |
279
|
|
|
abstract protected function stopIdGenerator($objectName); |
280
|
|
|
|
281
|
|
|
abstract protected function restoreIdGenerator($objectName); |
282
|
|
|
|
283
|
|
|
/** |
284
|
|
|
* @param array $criterion |
285
|
|
|
* @param int $limit |
286
|
|
|
* @param int $offset |
287
|
|
|
*/ |
288
|
2 |
|
private function resetJobsByCriterion( |
289
|
|
|
array $criterion, |
290
|
|
|
$limit, |
291
|
|
|
$offset |
292
|
|
|
) { |
293
|
2 |
|
$objectManager = $this->getObjectManager(); |
294
|
2 |
|
$this->resetSaveOk(__FUNCTION__); |
295
|
2 |
|
$objectName = $this->getJobClass(); |
296
|
2 |
|
$archiveObjectName = $this->getJobArchiveClass(); |
297
|
2 |
|
$jobRepository = $objectManager->getRepository($objectName); |
298
|
2 |
|
$jobArchiveRepository = $objectManager->getRepository($archiveObjectName); |
299
|
2 |
|
$className = $jobRepository->getClassName(); |
300
|
2 |
|
$metadata = $objectManager->getClassMetadata($className); |
301
|
2 |
|
$this->stopIdGenerator($objectName); |
302
|
2 |
|
$identifierData = $metadata->getIdentifier(); |
303
|
2 |
|
$idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id'; |
304
|
2 |
|
$results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset); |
305
|
2 |
|
$countProcessed = 0; |
306
|
|
|
|
307
|
2 |
|
foreach ($results as $jobArchive) { |
308
|
2 |
|
$countProcessed += $this->resetArchiveJob($jobArchive); |
309
|
|
|
} |
310
|
2 |
|
$objectManager->flush(); |
311
|
|
|
|
312
|
2 |
|
$this->restoreIdGenerator($objectName); |
313
|
|
|
|
314
|
2 |
|
return $countProcessed; |
315
|
|
|
} |
316
|
|
|
|
317
|
22 |
|
protected function resetSaveOk($function) |
318
|
|
|
{ |
319
|
22 |
|
} |
320
|
|
|
|
321
|
|
|
/** |
322
|
|
|
* @param null $workerName |
323
|
|
|
* @param null $methodName |
324
|
|
|
* @param bool $prioritize |
325
|
|
|
*/ |
326
|
|
|
abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true); |
327
|
|
|
|
328
|
|
|
/** |
329
|
|
|
* @param StallableJob $jobArchive |
330
|
|
|
* @param $className |
331
|
|
|
* |
332
|
|
|
* @return int Number of jobs reset |
333
|
|
|
*/ |
334
|
2 |
|
protected function resetArchiveJob(StallableJob $jobArchive) |
335
|
|
|
{ |
336
|
2 |
|
$objectManager = $this->getObjectManager(); |
337
|
2 |
|
if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) { |
|
|
|
|
338
|
2 |
|
$objectManager->persist($jobArchive); |
339
|
|
|
|
340
|
2 |
|
return 0; |
341
|
|
|
} |
342
|
|
|
|
343
|
|
|
/** @var StallableJob $job */ |
344
|
2 |
|
$className = $this->getJobClass(); |
345
|
2 |
|
$newJob = new $className(); |
346
|
2 |
|
Util::copy($jobArchive, $newJob); |
347
|
2 |
|
$this->resetJob($newJob); |
348
|
2 |
|
$objectManager->remove($jobArchive); |
349
|
2 |
|
$this->flush(); |
350
|
|
|
|
351
|
2 |
|
return 1; |
352
|
|
|
} |
353
|
|
|
|
354
|
8 |
|
protected function resetJob(RetryableJob $job) |
355
|
|
|
{ |
356
|
8 |
|
if (!$job instanceof StallableJob) { |
357
|
|
|
throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class); |
358
|
|
|
} |
359
|
8 |
|
$job->setStatus(BaseJob::STATUS_NEW); |
360
|
8 |
|
$job->setMessage(null); |
361
|
8 |
|
$job->setFinishedAt(null); |
362
|
8 |
|
$job->setStartedAt(null); |
363
|
8 |
|
$job->setElapsed(null); |
364
|
8 |
|
$job->setRetries($job->getRetries() + 1); |
365
|
8 |
|
$this->getObjectManager()->persist($job); |
366
|
8 |
|
$this->flush(); |
367
|
|
|
|
368
|
8 |
|
return true; |
369
|
|
|
} |
370
|
|
|
|
371
|
|
|
abstract public function getWorkersAndMethods(); |
372
|
|
|
|
373
|
|
|
abstract public function countLiveJobs($workerName = null, $methodName = null); |
374
|
|
|
|
375
|
|
|
abstract public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback); |
376
|
|
|
} |
377
|
|
|
|
This check looks at variables that are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.