1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Doctrine; |
4
|
|
|
|
5
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
6
|
|
|
use Dtc\QueueBundle\Model\Job; |
7
|
|
|
use Dtc\QueueBundle\Model\JobTiming; |
8
|
|
|
use Dtc\QueueBundle\Model\RetryableJob; |
9
|
|
|
use Dtc\QueueBundle\Model\StallableJob; |
10
|
|
|
|
11
|
|
|
abstract class DoctrineJobManager extends BaseDoctrineJobManager |
12
|
|
|
{ |
13
|
|
|
use ProgressCallbackTrait; |
14
|
|
|
use StalledTrait; |
15
|
|
|
|
16
|
|
|
/** Number of seconds before a job is considered stalled if the runner is no longer active */ |
17
|
|
|
const STALLED_SECONDS = 1800; |
18
|
|
|
|
19
|
|
|
/** |
20
|
|
|
* @param string $objectName |
21
|
|
|
*/ |
22
|
|
|
abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null); |
23
|
|
|
|
24
|
2 |
|
public function resetExceptionJobs($workerName = null, $method = null) |
25
|
|
|
{ |
26
|
2 |
|
$count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method); |
27
|
|
|
|
28
|
2 |
|
$criterion = ['status' => Job::STATUS_EXCEPTION]; |
29
|
2 |
|
$this->addWorkerNameMethod($criterion, $workerName, $method); |
30
|
2 |
|
$saveCount = $this->getSaveCount($count); |
31
|
2 |
|
$countProcessed = 0; |
32
|
2 |
|
for ($i = 0; $i < $count; $i += $saveCount) { |
33
|
2 |
|
$countProcessed += $this->resetJobsByCriterion( |
34
|
2 |
|
$criterion, |
35
|
|
|
$saveCount, |
36
|
|
|
$i |
37
|
|
|
); |
38
|
|
|
} |
39
|
|
|
|
40
|
2 |
|
return $countProcessed; |
41
|
|
|
} |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* Sets the status to Job::STATUS_EXPIRED for those jobs that are expired. |
45
|
|
|
* |
46
|
|
|
* @param string|null $workerName |
47
|
|
|
* @param string|null $method |
48
|
|
|
* |
49
|
|
|
* @return mixed |
50
|
|
|
*/ |
51
|
|
|
abstract protected function updateExpired($workerName = null, $method = null); |
52
|
|
|
|
53
|
3 |
|
public function pruneExpiredJobs($workerName = null, $method = null) |
54
|
|
|
{ |
55
|
3 |
|
$count = $this->updateExpired($workerName, $method); |
56
|
3 |
|
$criterion = ['status' => Job::STATUS_EXPIRED]; |
57
|
3 |
|
$this->addWorkerNameMethod($criterion, $workerName, $method); |
58
|
3 |
|
$objectManager = $this->getObjectManager(); |
59
|
3 |
|
$repository = $this->getRepository(); |
60
|
3 |
|
$finalCount = 0; |
61
|
|
|
|
62
|
3 |
|
$metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass()); |
63
|
3 |
|
$identifierData = $metadata->getIdentifier(); |
64
|
3 |
|
$idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id'; |
65
|
|
|
|
66
|
3 |
|
$fetchCount = $this->getFetchCount($count); |
67
|
3 |
|
for ($i = 0; $i < $count; $i += $fetchCount) { |
68
|
3 |
|
$expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i); |
69
|
3 |
|
$innerCount = 0; |
70
|
3 |
|
if (!empty($expiredJobs)) { |
71
|
3 |
|
foreach ($expiredJobs as $expiredJob) { |
72
|
|
|
/* @var Job $expiredJob */ |
|
|
|
|
73
|
3 |
|
$expiredJob->setStatus(Job::STATUS_EXPIRED); |
74
|
3 |
|
$objectManager->remove($expiredJob); |
75
|
3 |
|
++$finalCount; |
76
|
3 |
|
++$innerCount; |
77
|
|
|
} |
78
|
|
|
} |
79
|
3 |
|
$this->flush(); |
80
|
3 |
|
for ($j = 0; $j < $innerCount; ++$j) { |
81
|
3 |
|
$this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED); |
82
|
|
|
} |
83
|
|
|
} |
84
|
|
|
|
85
|
3 |
|
return $finalCount; |
86
|
|
|
} |
87
|
|
|
|
88
|
2 |
|
public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null) |
89
|
|
|
{ |
90
|
2 |
|
$stalledJobs = $this->getStalledJobs($workerName, $method); |
91
|
2 |
|
$stalledJobsCount = count($stalledJobs); |
92
|
2 |
|
$this->updateProgress($progressCallback, 0, $stalledJobsCount); |
93
|
2 |
|
$countReset = 0; |
94
|
2 |
|
$saveCount = $this->getSaveCount($stalledJobsCount); |
95
|
2 |
|
for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) { |
96
|
2 |
|
$resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs); |
97
|
2 |
|
for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) { |
98
|
2 |
|
$this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED); |
99
|
|
|
} |
100
|
2 |
|
$countReset += $resetCount; |
101
|
2 |
|
$this->flush(); |
102
|
2 |
|
$this->updateProgress($progressCallback, $countReset, $stalledJobsCount); |
103
|
|
|
} |
104
|
|
|
|
105
|
2 |
|
return $countReset; |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
/** |
109
|
|
|
* @param string|null $workerName |
110
|
|
|
* @param string|null $method |
111
|
|
|
*/ |
112
|
4 |
|
public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null) |
113
|
|
|
{ |
114
|
4 |
|
$stalledJobs = $this->getStalledJobs($workerName, $method); |
115
|
4 |
|
$stalledJobsCount = count($stalledJobs); |
116
|
4 |
|
$this->updateProgress($progressCallback, 0, $stalledJobsCount); |
117
|
4 |
|
$countProcessed = 0; |
118
|
4 |
|
$saveCount = $this->getSaveCount($stalledJobsCount); |
119
|
4 |
|
for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) { |
120
|
4 |
|
for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) { |
121
|
|
|
/** @var StallableJob $job */ |
122
|
4 |
|
$job = $stalledJobs[$j]; |
123
|
4 |
|
$job->setStatus(StallableJob::STATUS_STALLED); |
124
|
4 |
|
$job->setStalls(intval($job->getStalls()) + 1); |
125
|
4 |
|
$this->deleteJob($job); |
126
|
4 |
|
++$countProcessed; |
127
|
|
|
} |
128
|
4 |
|
$this->flush(); |
129
|
4 |
|
$this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount); |
130
|
|
|
} |
131
|
|
|
|
132
|
4 |
|
return $countProcessed; |
133
|
|
|
} |
134
|
|
|
|
135
|
10 |
|
protected function stallableSaveHistory(StallableJob $job, $retry) |
136
|
|
|
{ |
137
|
10 |
|
if (!$retry) { |
138
|
10 |
|
$this->deleteJob($job); |
139
|
|
|
} |
140
|
|
|
|
141
|
10 |
|
return $retry; |
142
|
|
|
} |
143
|
|
|
|
144
|
43 |
|
protected function stallableSave(StallableJob $job) |
145
|
|
|
{ |
146
|
|
|
// Generate crc hash for the job |
147
|
43 |
|
$hashValues = [get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs()]; |
148
|
43 |
|
$crcHash = hash('sha256', serialize($hashValues)); |
149
|
43 |
|
$job->setCrcHash($crcHash); |
150
|
|
|
|
151
|
43 |
|
if (true === $job->getBatch()) { |
152
|
2 |
|
$oldJob = $this->updateNearestBatch($job); |
153
|
2 |
|
if ($oldJob) { |
154
|
2 |
|
return $oldJob; |
155
|
|
|
} |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
// Just save a new job |
159
|
43 |
|
$this->persist($job); |
160
|
|
|
|
161
|
43 |
|
return $job; |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
abstract protected function updateNearestBatch(Job $job); |
165
|
|
|
|
166
|
|
|
/** |
167
|
|
|
* @param string $objectName |
168
|
|
|
*/ |
169
|
|
|
abstract protected function stopIdGenerator($objectName); |
170
|
|
|
|
171
|
|
|
abstract protected function restoreIdGenerator($objectName); |
172
|
|
|
|
173
|
|
|
/** |
174
|
|
|
* @param string|null $workerName |
175
|
|
|
* @param string|null $methodName |
176
|
|
|
* @param bool $prioritize |
177
|
|
|
*/ |
178
|
|
|
abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true); |
179
|
|
|
|
180
|
8 |
|
protected function resetJob(RetryableJob $job) |
181
|
|
|
{ |
182
|
8 |
|
if (!$job instanceof StallableJob) { |
183
|
|
|
throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class); |
184
|
|
|
} |
185
|
8 |
|
$job->setStatus(BaseJob::STATUS_NEW); |
186
|
8 |
|
$job->setMessage(null); |
187
|
8 |
|
$job->setFinishedAt(null); |
188
|
8 |
|
$job->setStartedAt(null); |
189
|
8 |
|
$job->setElapsed(null); |
190
|
8 |
|
$job->setRetries($job->getRetries() + 1); |
191
|
8 |
|
$this->persist($job); |
192
|
|
|
|
193
|
8 |
|
return true; |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
abstract public function getWorkersAndMethods(); |
197
|
|
|
|
198
|
|
|
abstract public function countLiveJobs($workerName = null, $methodName = null); |
199
|
|
|
|
200
|
|
|
abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null); |
201
|
|
|
} |
202
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.