1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ORM; |
4
|
|
|
|
5
|
|
|
use Doctrine\DBAL\LockMode; |
6
|
|
|
use Doctrine\ORM\EntityManager; |
7
|
|
|
use Doctrine\ORM\EntityRepository; |
8
|
|
|
use Doctrine\ORM\Id\AssignedGenerator; |
9
|
|
|
use Doctrine\ORM\Mapping\ClassMetadata; |
10
|
|
|
use Doctrine\ORM\QueryBuilder; |
11
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
12
|
|
|
use Dtc\QueueBundle\Entity\Job; |
13
|
|
|
use Dtc\QueueBundle\Model\BaseJob; |
14
|
|
|
|
15
|
|
|
class JobManager extends BaseJobManager |
16
|
|
|
{ |
17
|
|
|
protected $formerIdGenerators; |
18
|
|
|
|
19
|
8 |
|
public function stopIdGenerator($objectName) |
20
|
|
|
{ |
21
|
8 |
|
$objectManager = $this->getObjectManager(); |
22
|
8 |
|
$repository = $objectManager->getRepository($objectName); |
23
|
|
|
/** @var ClassMetadata $metadata */ |
24
|
8 |
|
$metadata = $objectManager->getClassMetadata($repository->getClassName()); |
25
|
8 |
|
$this->formerIdGenerators[$objectName]['generator'] = $metadata->idGenerator; |
26
|
8 |
|
$this->formerIdGenerators[$objectName]['type'] = $metadata->generatorType; |
27
|
8 |
|
$metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE); |
28
|
8 |
|
$metadata->setIdGenerator(new AssignedGenerator()); |
29
|
8 |
|
} |
30
|
|
|
|
31
|
1 |
|
public function restoreIdGenerator($objectName) |
32
|
|
|
{ |
33
|
1 |
|
$objectManager = $this->getObjectManager(); |
34
|
1 |
|
$repository = $objectManager->getRepository($objectName); |
35
|
|
|
/** @var ClassMetadata $metadata */ |
36
|
1 |
|
$metadata = $objectManager->getClassMetadata($repository->getClassName()); |
37
|
1 |
|
$generator = $this->formerIdGenerators[$objectName]['generator']; |
38
|
1 |
|
$type = $this->formerIdGenerators[$objectName]['type']; |
39
|
1 |
|
$metadata->setIdGeneratorType($type); |
40
|
1 |
|
$metadata->setIdGenerator($generator); |
41
|
1 |
|
} |
42
|
|
|
|
43
|
3 |
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
44
|
|
|
{ |
45
|
|
|
/** @var EntityManager $objectManager */ |
46
|
3 |
|
$objectManager = $this->getObjectManager(); |
47
|
|
|
|
48
|
|
|
$qb = $objectManager |
49
|
3 |
|
->createQueryBuilder() |
50
|
3 |
|
->select('count(a.id)') |
51
|
3 |
|
->from($objectName, 'a') |
52
|
3 |
|
->where('a.status = :status'); |
53
|
|
|
|
54
|
3 |
|
if (null !== $workerName) { |
55
|
1 |
|
$qb->andWhere('a.workerName = :workerName') |
56
|
1 |
|
->setParameter(':workerName', $workerName); |
57
|
1 |
|
} |
58
|
|
|
|
59
|
3 |
|
if (null !== $method) { |
60
|
1 |
|
$qb->andWhere('a.method = :method') |
61
|
1 |
|
->setParameter(':method', $workerName); |
62
|
1 |
|
} |
63
|
|
|
|
64
|
3 |
|
$count = $qb->setParameter(':status', $status) |
65
|
3 |
|
->getQuery()->getSingleScalarResult(); |
66
|
|
|
|
67
|
3 |
|
if (!$count) { |
68
|
1 |
|
return 0; |
69
|
|
|
} |
70
|
|
|
|
71
|
3 |
|
return $count; |
72
|
1 |
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* @param string|null $workerName |
76
|
|
|
* @param string|null $method |
77
|
|
|
* |
78
|
|
|
* @return int Count of jobs pruned |
79
|
|
|
*/ |
80
|
|
|
public function pruneErroneousJobs($workerName = null, $method = null) |
81
|
|
|
{ |
82
|
1 |
|
return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) { |
83
|
|
|
/* @var QueryBuilder $qb */ |
84
|
1 |
|
$qb->where('j.status = :status') |
85
|
1 |
|
->setParameter(':status', BaseJob::STATUS_ERROR); |
86
|
1 |
|
}); |
87
|
|
|
} |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* Prunes jobs according to a condition function. |
91
|
|
|
* |
92
|
|
|
* @param string|null $workerName |
93
|
|
|
* @param string|null $method |
94
|
|
|
* @param $conditionFunc |
95
|
|
|
* |
96
|
|
|
* @return int Count of jobs pruned |
97
|
|
|
*/ |
98
|
1 |
View Code Duplication |
protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc) |
|
|
|
|
99
|
|
|
{ |
100
|
|
|
/** @var EntityManager $objectManager */ |
101
|
1 |
|
$objectManager = $this->getObjectManager(); |
102
|
1 |
|
$qb = $objectManager->createQueryBuilder()->delete($objectName, 'j'); |
103
|
1 |
|
$conditionFunc($qb); |
104
|
|
|
|
105
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
106
|
1 |
|
$query = $qb->getQuery(); |
107
|
|
|
|
108
|
1 |
|
return intval($query->execute()); |
109
|
|
|
} |
110
|
|
|
|
111
|
7 |
|
protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null) |
112
|
|
|
{ |
113
|
7 |
|
if (null !== $workerName) { |
114
|
3 |
|
$queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName); |
115
|
3 |
|
} |
116
|
|
|
|
117
|
7 |
|
if (null !== $method) { |
118
|
2 |
|
$queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method); |
119
|
2 |
|
} |
120
|
7 |
|
} |
121
|
|
|
|
122
|
1 |
|
protected function updateExpired($workerName = null, $method = null) |
123
|
|
|
{ |
124
|
|
|
/** @var EntityManager $objectManager */ |
125
|
1 |
|
$objectManager = $this->getObjectManager(); |
126
|
1 |
|
$qb = $objectManager->createQueryBuilder()->update($this->getObjectName(), 'j'); |
127
|
1 |
|
$qb->set('j.status', ':newStatus'); |
128
|
1 |
|
$qb->where('j.expiresAt <= :expiresAt') |
129
|
1 |
|
->setParameter(':expiresAt', new \DateTime()); |
130
|
1 |
|
$qb->andWhere('j.status = :status') |
131
|
1 |
|
->setParameter(':status', BaseJob::STATUS_NEW) |
132
|
1 |
|
->setParameter(':newStatus', Job::STATUS_EXPIRED); |
133
|
|
|
|
134
|
1 |
|
$this->addWorkerNameCriterion($qb, $workerName, $method); |
135
|
1 |
|
$query = $qb->getQuery(); |
136
|
|
|
|
137
|
1 |
|
return intval($query->execute()); |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* Removes archived jobs older than $olderThan. |
142
|
|
|
* |
143
|
|
|
* @param \DateTime $olderThan |
144
|
|
|
*/ |
145
|
1 |
|
public function pruneArchivedJobs(\DateTime $olderThan) |
146
|
|
|
{ |
147
|
|
|
/** @var EntityManager $objectManager */ |
148
|
1 |
|
$objectManager = $this->getObjectManager(); |
149
|
1 |
|
$qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j'); |
150
|
|
|
$qb = $qb |
151
|
1 |
|
->where('j.updatedAt < :updatedAt') |
152
|
1 |
|
->setParameter(':updatedAt', $olderThan); |
153
|
|
|
|
154
|
1 |
|
$query = $qb->getQuery(); |
155
|
|
|
|
156
|
1 |
|
return $query->execute(); |
157
|
|
|
} |
158
|
|
|
|
159
|
1 |
|
public function getJobCount($workerName = null, $method = null) |
160
|
|
|
{ |
161
|
|
|
/** @var EntityManager $objectManager */ |
162
|
1 |
|
$objectManager = $this->getObjectManager(); |
163
|
1 |
|
$qb = $objectManager->createQueryBuilder(); |
164
|
|
|
|
165
|
1 |
|
$qb = $qb->select('count(j)')->from($this->getObjectName(), 'j'); |
166
|
|
|
|
167
|
1 |
|
$where = 'where'; |
168
|
1 |
|
if (null !== $workerName) { |
169
|
|
|
if (null !== $method) { |
170
|
|
|
$qb->where($qb->expr()->andX( |
171
|
|
|
$qb->expr()->eq('j.workerName', ':workerName'), |
172
|
|
|
$qb->expr()->eq('j.method', ':method') |
173
|
1 |
|
)) |
174
|
|
|
->setParameter(':method', $method); |
175
|
|
|
} else { |
176
|
|
|
$qb->where('j.workerName = :workerName'); |
177
|
|
|
} |
178
|
|
|
$qb->setParameter(':workerName', $workerName); |
179
|
|
|
$where = 'andWhere'; |
180
|
1 |
|
} elseif (null !== $method) { |
181
|
|
|
$qb->where('j.method = :method')->setParameter(':method', $method); |
182
|
|
|
$where = 'andWhere'; |
183
|
|
|
} |
184
|
|
|
|
185
|
1 |
|
$dateTime = new \DateTime(); |
186
|
|
|
// Filter |
187
|
|
|
$qb |
188
|
1 |
|
->$where($qb->expr()->orX( |
189
|
1 |
|
$qb->expr()->isNull('j.whenAt'), |
190
|
1 |
|
$qb->expr()->lte('j.whenAt', ':whenAt') |
191
|
1 |
|
)) |
192
|
1 |
|
->andWhere($qb->expr()->orX( |
193
|
1 |
|
$qb->expr()->isNull('j.expiresAt'), |
194
|
1 |
|
$qb->expr()->gt('j.expiresAt', ':expiresAt') |
195
|
1 |
|
)) |
196
|
1 |
|
->andWhere('j.locked is NULL') |
197
|
1 |
|
->setParameter(':whenAt', $dateTime) |
198
|
1 |
|
->setParameter(':expiresAt', $dateTime); |
199
|
|
|
|
200
|
1 |
|
$query = $qb->getQuery(); |
201
|
|
|
|
202
|
1 |
|
return $query->getSingleScalarResult(); |
203
|
|
|
} |
204
|
|
|
|
205
|
|
|
/** |
206
|
|
|
* Get Jobs statuses. |
207
|
|
|
*/ |
208
|
|
|
public function getStatus() |
209
|
|
|
{ |
210
|
|
|
$result = []; |
211
|
|
|
$this->getStatusByEntityName($this->getObjectName(), $result); |
212
|
|
|
$this->getStatusByEntityName($this->getObjectName(), $result); |
213
|
|
|
|
214
|
|
|
$finalResult = []; |
215
|
|
|
foreach ($result as $key => $item) { |
216
|
|
|
ksort($item); |
217
|
|
|
$finalResult[$key] = $item; |
218
|
|
|
} |
219
|
|
|
|
220
|
|
|
return $finalResult; |
221
|
|
|
} |
222
|
|
|
|
223
|
|
|
/** |
224
|
|
|
* @param string $entityName |
225
|
|
|
*/ |
226
|
|
|
protected function getStatusByEntityName($entityName, array &$result) |
227
|
|
|
{ |
228
|
|
|
/** @var EntityManager $objectManager */ |
229
|
|
|
$objectManager = $this->getObjectManager(); |
230
|
|
|
$result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c') |
231
|
|
|
->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult(); |
232
|
|
|
|
233
|
|
|
foreach ($result1 as $item) { |
234
|
|
|
$method = $item['workerName'].'->'.$item['method']; |
235
|
|
|
if (!isset($result[$method])) { |
236
|
|
|
$result[$method] = [BaseJob::STATUS_NEW => 0, |
237
|
|
|
BaseJob::STATUS_RUNNING => 0, |
238
|
|
|
BaseJob::STATUS_SUCCESS => 0, |
239
|
|
|
BaseJob::STATUS_ERROR => 0, ]; |
240
|
|
|
} |
241
|
|
|
$result[$method][$item['status']] += intval($item['c']); |
242
|
|
|
} |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
/** |
246
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
247
|
|
|
* |
248
|
|
|
* @param string $workerName |
249
|
|
|
* @param string $methodName |
250
|
|
|
* @param bool $prioritize |
251
|
|
|
* |
252
|
|
|
* @return Job|null |
253
|
|
|
*/ |
254
|
5 |
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
255
|
|
|
{ |
256
|
5 |
|
$uniqid = uniqid(gethostname().'-'.getmypid(), true); |
257
|
5 |
|
$hash = hash('sha256', $uniqid); |
258
|
|
|
|
259
|
|
|
/** @var EntityManager $objectManager */ |
260
|
5 |
|
$objectManager = $this->getObjectManager(); |
261
|
|
|
|
262
|
5 |
|
$objectManager->beginTransaction(); |
263
|
|
|
|
264
|
|
|
/** @var EntityRepository $repository */ |
265
|
5 |
|
$repository = $this->getRepository(); |
266
|
5 |
|
$qb = $repository->createQueryBuilder('j'); |
267
|
5 |
|
$dateTime = new \DateTime(); |
268
|
|
|
$qb |
269
|
5 |
|
->select('j') |
270
|
5 |
|
->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW) |
271
|
5 |
|
->andWhere('j.locked is NULL') |
272
|
5 |
|
->andWhere($qb->expr()->orX( |
273
|
5 |
|
$qb->expr()->isNull('j.whenAt'), |
274
|
5 |
|
$qb->expr()->lte('j.whenAt', ':whenAt') |
275
|
5 |
|
)) |
276
|
5 |
|
->andWhere($qb->expr()->orX( |
277
|
5 |
|
$qb->expr()->isNull('j.expiresAt'), |
278
|
5 |
|
$qb->expr()->gt('j.expiresAt', ':expiresAt') |
279
|
5 |
|
)) |
280
|
5 |
|
->setParameter(':whenAt', $dateTime) |
281
|
5 |
|
->setParameter(':expiresAt', $dateTime); |
282
|
|
|
|
283
|
5 |
|
$this->addWorkerNameCriterion($qb, $workerName, $methodName); |
284
|
|
|
|
285
|
5 |
|
if ($prioritize) { |
286
|
5 |
|
$qb->add('orderBy', 'j.priority ASC, j.whenAt ASC'); |
|
|
|
|
287
|
5 |
|
} else { |
288
|
|
|
$qb->orderBy('j.whenAt', 'ASC'); |
289
|
|
|
} |
290
|
5 |
|
$qb->setMaxResults(1); |
291
|
|
|
|
292
|
|
|
/** @var QueryBuilder $qb */ |
293
|
5 |
|
$query = $qb->getQuery(); |
294
|
5 |
|
$query->setLockMode(LockMode::PESSIMISTIC_WRITE); |
295
|
5 |
|
$jobs = $query->getResult(); |
296
|
|
|
|
297
|
5 |
|
if ($jobs) { |
298
|
|
|
/** @var Job $job */ |
299
|
4 |
|
$job = $jobs[0]; |
300
|
4 |
|
if (!$job) { |
301
|
|
|
throw new \Exception("No job found for $hash, even though last result was count ".count($jobs)); |
302
|
|
|
} |
303
|
4 |
|
$job->setLocked(true); |
304
|
4 |
|
$job->setLockedAt(new \DateTime()); |
305
|
4 |
|
$job->setStatus(BaseJob::STATUS_RUNNING); |
306
|
4 |
|
$job->setRunId($runId); |
307
|
4 |
|
$objectManager->commit(); |
308
|
4 |
|
$objectManager->flush(); |
309
|
|
|
|
310
|
4 |
|
return $job; |
311
|
|
|
} |
312
|
|
|
|
313
|
2 |
|
$objectManager->rollback(); |
314
|
|
|
|
315
|
2 |
|
return null; |
316
|
|
|
} |
317
|
|
|
} |
318
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.