1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Doctrine; |
4
|
|
|
|
5
|
|
|
use Doctrine\Common\Persistence\ObjectManager; |
6
|
|
|
use Doctrine\Common\Persistence\ObjectRepository; |
7
|
|
|
use Dtc\QueueBundle\Model\AbstractJobManager; |
8
|
|
|
use Dtc\QueueBundle\Model\Job; |
9
|
|
|
use Dtc\QueueBundle\Model\Run; |
10
|
|
|
use Dtc\QueueBundle\Util\Util; |
11
|
|
|
|
12
|
|
|
abstract class BaseJobManager extends AbstractJobManager |
13
|
|
|
{ |
14
|
|
|
/** Number of jobs to prune / reset / gather at a time */ |
15
|
|
|
const FETCH_COUNT = 100; |
16
|
|
|
|
17
|
|
|
/** Number of seconds before a job is considered stalled if the runner is no longer active */ |
18
|
|
|
const STALLED_SECONDS = 1800; |
19
|
|
|
protected $objectManager; |
20
|
|
|
protected $objectName; |
21
|
|
|
protected $archiveObjectName; |
22
|
|
|
protected $runClass; |
23
|
|
|
protected $runArchiveClass; |
24
|
|
|
|
25
|
|
|
public function __construct(ObjectManager $objectManager, |
26
|
|
|
$objectName, |
27
|
|
|
$archiveObjectName, |
28
|
|
|
$runClass, |
29
|
|
|
$runArchiveClass) |
30
|
|
|
{ |
31
|
|
|
$this->objectManager = $objectManager; |
32
|
|
|
$this->objectName = $objectName; |
33
|
|
|
$this->archiveObjectName = $archiveObjectName; |
34
|
|
|
$this->runClass = $runClass; |
35
|
|
|
$this->runArchiveClass = $runArchiveClass; |
36
|
|
|
} |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @return ObjectManager |
40
|
|
|
*/ |
41
|
|
|
public function getObjectManager() |
42
|
|
|
{ |
43
|
|
|
return $this->objectManager; |
44
|
|
|
} |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @return string |
48
|
|
|
*/ |
49
|
|
|
public function getObjectName() |
50
|
|
|
{ |
51
|
|
|
return $this->objectName; |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* @return string |
56
|
|
|
*/ |
57
|
|
|
public function getArchiveObjectName() |
58
|
|
|
{ |
59
|
|
|
return $this->archiveObjectName; |
60
|
|
|
} |
61
|
|
|
|
62
|
|
|
/** |
63
|
|
|
* @return string |
64
|
|
|
*/ |
65
|
|
|
public function getRunClass() |
66
|
|
|
{ |
67
|
|
|
return $this->runClass; |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* @return string |
72
|
|
|
*/ |
73
|
|
|
public function getRunArchiveClass() |
74
|
|
|
{ |
75
|
|
|
return $this->runArchiveClass; |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
/** |
79
|
|
|
* @return ObjectRepository |
80
|
|
|
*/ |
81
|
|
|
public function getRepository() |
82
|
|
|
{ |
83
|
|
|
return $this->getObjectManager()->getRepository($this->getObjectName()); |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null); |
87
|
|
|
|
88
|
|
|
public function resetErroneousJobs($workerName = null, $method = null) |
89
|
|
|
{ |
90
|
|
|
$count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method); |
91
|
|
|
|
92
|
|
|
$criterion = ['status' => Job::STATUS_ERROR]; |
93
|
|
|
if ($workerName) { |
94
|
|
|
$criterion['workerName'] = $workerName; |
95
|
|
|
} |
96
|
|
|
if ($method) { |
97
|
|
|
$criterion['method'] = $method; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
$countProcessed = 0; |
101
|
|
|
$objectManager = $this->getObjectManager(); |
102
|
|
|
for ($i = 0; $i < $count; $i += static::FETCH_COUNT) { |
103
|
|
|
$countProcessed += $this->resetJobsByCriterion( |
104
|
|
|
$criterion, static::FETCH_COUNT, $i); |
105
|
|
|
$objectManager->flush(); |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
return $countProcessed; |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
protected function getStalledJobs($workerName = null, $method = null) |
112
|
|
|
{ |
113
|
|
|
$count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method); |
114
|
|
|
|
115
|
|
|
$criterion = ['status' => Job::STATUS_RUNNING]; |
116
|
|
|
if ($workerName) { |
117
|
|
|
$criterion['workerName'] = $workerName; |
118
|
|
|
} |
119
|
|
|
if ($method) { |
120
|
|
|
$criterion['method'] = $method; |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
$runningJobsById = []; |
124
|
|
|
|
125
|
|
|
$objectManager = $this->getObjectManager(); |
126
|
|
|
$runRepository = $objectManager->getRepository($this->runClass); |
127
|
|
|
$runArchiveRepository = $objectManager->getRepository($this->runArchiveClass); |
128
|
|
|
$repository = $this->getRepository(); |
129
|
|
|
|
130
|
|
|
for ($i = 0; $i < $count; $i += static::FETCH_COUNT) { |
131
|
|
|
$runningJobs = $repository->findBy($criterion); |
132
|
|
|
if ($runningJobs) { |
|
|
|
|
133
|
|
|
foreach ($runningJobs as $job) { |
134
|
|
|
/** Job $job */ |
135
|
|
|
if (null !== $runId = $job->getRunId()) { |
136
|
|
|
$runningJobsById[$runId][] = $job; |
137
|
|
|
} |
138
|
|
|
} |
139
|
|
|
} |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
$stalledJobs = []; |
143
|
|
|
foreach (array_keys($runningJobsById) as $runId) { |
144
|
|
|
if ($runRepository->find($runId)) { |
145
|
|
|
continue; |
146
|
|
|
} |
147
|
|
|
/** @var Run $run */ |
148
|
|
|
if ($run = $runArchiveRepository->find($runId)) { |
|
|
|
|
149
|
|
|
if ($endTime = $run->getEndedAt()) { |
150
|
|
|
// Did it end over an hour ago |
151
|
|
|
if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) { |
152
|
|
|
$stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]); |
153
|
|
|
} |
154
|
|
|
} |
155
|
|
|
} |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
return $stalledJobs; |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
public function resetStalledJobs($workerName = null, $method = null) |
162
|
|
|
{ |
163
|
|
|
$stalledJobs = $this->getStalledJobs($workerName, $method); |
164
|
|
|
|
165
|
|
|
$objectManager = $this->getObjectManager(); |
166
|
|
|
|
167
|
|
|
$countProcessed = 0; |
168
|
|
|
for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) { |
169
|
|
|
for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j ) { |
170
|
|
|
$job = $stalledJobs[$j]; |
171
|
|
|
/* Job $job */ |
172
|
|
|
$job->setStatus(Job::STATUS_NEW); |
173
|
|
|
$job->setLocked(null); |
174
|
|
|
$job->setLockedAt(null); |
175
|
|
|
$objectManager->persist($job); |
176
|
|
|
++$countProcessed; |
177
|
|
|
} |
178
|
|
|
$objectManager->flush(); |
179
|
|
|
} |
180
|
|
|
|
181
|
|
|
return $countProcessed; |
182
|
|
|
} |
183
|
|
|
|
184
|
|
|
public function pruneStalledJobs($workerName = null, $method = null) |
185
|
|
|
{ |
186
|
|
|
$stalledJobs = $this->getStalledJobs($workerName, $method); |
187
|
|
|
$objectManager = $this->getObjectManager(); |
188
|
|
|
|
189
|
|
|
$countProcessed = 0; |
190
|
|
|
for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) { |
191
|
|
|
for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j ) { |
192
|
|
|
$job = $stalledJobs[$j]; |
193
|
|
|
$objectManager->remove($job); |
194
|
|
|
++$countProcessed; |
195
|
|
|
} |
196
|
|
|
$objectManager->flush(); |
197
|
|
|
} |
198
|
|
|
|
199
|
|
|
return $countProcessed; |
200
|
|
|
} |
201
|
|
|
|
202
|
|
|
public function deleteJob(\Dtc\QueueBundle\Model\Job $job) |
203
|
|
|
{ |
204
|
|
|
$objectManager = $this->getObjectManager(); |
205
|
|
|
$objectManager->remove($job); |
206
|
|
|
$objectManager->flush(); |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
public function saveHistory(\Dtc\QueueBundle\Model\Job $job) |
210
|
|
|
{ |
211
|
|
|
$this->deleteJob($job); // Should cause job to be archived |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
public function save(\Dtc\QueueBundle\Model\Job $job) |
215
|
|
|
{ |
216
|
|
|
// Todo: Serialize args |
217
|
|
|
|
218
|
|
|
// Generate crc hash for the job |
219
|
|
|
$hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs()); |
220
|
|
|
$crcHash = hash('sha256', serialize($hashValues)); |
221
|
|
|
$job->setCrcHash($crcHash); |
222
|
|
|
$objectManager = $this->getObjectManager(); |
223
|
|
|
|
224
|
|
|
if (true === $job->getBatch()) { |
225
|
|
|
// See if similar job that hasn't run exists |
226
|
|
|
$criteria = array('crcHash' => $crcHash, 'status' => Job::STATUS_NEW); |
227
|
|
|
$oldJob = $this->getRepository()->findOneBy($criteria); |
228
|
|
|
|
229
|
|
|
if ($oldJob) { |
230
|
|
|
// Old job exists - just override fields Set higher priority |
231
|
|
|
$oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority())); |
232
|
|
|
$oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt())); |
233
|
|
|
$oldJob->setBatch(true); |
234
|
|
|
$oldJob->setUpdatedAt(new \DateTime()); |
235
|
|
|
$objectManager->persist($oldJob); |
236
|
|
|
$objectManager->flush(); |
237
|
|
|
|
238
|
|
|
return $oldJob; |
239
|
|
|
} |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
// Just save a new job |
243
|
|
|
$objectManager->persist($job); |
244
|
|
|
$objectManager->flush(); |
245
|
|
|
|
246
|
|
|
return $job; |
247
|
|
|
} |
248
|
|
|
|
249
|
|
|
abstract protected function stopIdGenerator($objectName); |
250
|
|
|
|
251
|
|
|
private function resetJobsByCriterion( |
252
|
|
|
$criterion, |
253
|
|
|
$limit, |
254
|
|
|
$offset) |
255
|
|
|
{ |
256
|
|
|
$objectManager = $this->getObjectManager(); |
257
|
|
|
$objectName = $this->getObjectName(); |
258
|
|
|
$archiveObjectName = $this->getArchiveObjectName(); |
259
|
|
|
$jobRepository = $objectManager->getRepository($objectName); |
260
|
|
|
$jobArchiveRepository = $objectManager->getRepository($archiveObjectName); |
261
|
|
|
$className = $jobRepository->getClassName(); |
262
|
|
|
$metadata = $objectManager->getClassMetadata($className); |
263
|
|
|
|
264
|
|
|
$this->stopIdGenerator($archiveObjectName); |
265
|
|
|
$identifierData = $metadata->getIdentifier(); |
266
|
|
|
$idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id'; |
267
|
|
|
$results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset); |
268
|
|
|
$countProcessed = 0; |
269
|
|
|
|
270
|
|
|
foreach ($results as $jobArchive) { |
271
|
|
|
/** @var Job $job */ |
272
|
|
|
$job = new $className(); |
273
|
|
|
Util::copy($jobArchive, $job); |
274
|
|
|
$job->setStatus(Job::STATUS_NEW); |
275
|
|
|
$job->setLocked(null); |
276
|
|
|
$job->setLockedAt(null); |
277
|
|
|
$job->setMessage(null); |
278
|
|
|
$job->setUpdatedAt(new \DateTime()); |
279
|
|
|
$job->setFinishedAt(null); |
|
|
|
|
280
|
|
|
$job->setElapsed(null); |
281
|
|
|
|
282
|
|
|
$objectManager->persist($job); |
283
|
|
|
$objectManager->remove($jobArchive); |
284
|
|
|
++$countProcessed; |
285
|
|
|
} |
286
|
|
|
|
287
|
|
|
return $countProcessed; |
288
|
|
|
} |
289
|
|
|
} |
290
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.