1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ORM; |
4
|
|
|
|
5
|
|
|
use Doctrine\DBAL\LockMode; |
6
|
|
|
use Doctrine\ORM\EntityManager; |
7
|
|
|
use Doctrine\ORM\EntityRepository; |
8
|
|
|
use Doctrine\ORM\Id\AssignedGenerator; |
9
|
|
|
use Doctrine\ORM\Mapping\ClassMetadata; |
10
|
|
|
use Doctrine\ORM\QueryBuilder; |
11
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
12
|
|
|
use Dtc\QueueBundle\Entity\Job; |
13
|
|
|
|
14
|
|
|
class JobManager extends BaseJobManager |
15
|
|
|
{ |
16
|
|
|
public function stopIdGenerator($objectName) |
17
|
|
|
{ |
18
|
|
|
$objectManager = $this->getObjectManager(); |
19
|
|
|
$repository = $objectManager->getRepository($objectName); |
20
|
|
|
/** @var ClassMetadata $metadata */ |
21
|
|
|
$metadata = $objectManager->getClassMetadata($repository->getClassName()); |
22
|
|
|
$metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE); |
23
|
|
|
$metadata->setIdGenerator(new AssignedGenerator()); |
24
|
|
|
} |
25
|
|
|
|
26
|
|
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
27
|
|
|
{ |
28
|
|
|
/** @var EntityManager $objectManager */ |
29
|
|
|
$objectManager = $this->getObjectManager(); |
30
|
|
|
|
31
|
|
|
$qb = $objectManager |
32
|
|
|
->createQueryBuilder() |
33
|
|
|
->select('count(a.id)') |
34
|
|
|
->from($objectName, 'a') |
35
|
|
|
->where('a.status = :status'); |
36
|
|
|
|
37
|
|
|
if ($workerName) { |
38
|
|
|
$qb->andWhere('a.workerName = :workerName') |
39
|
|
|
->setParameter(':workerName', $workerName); |
40
|
|
|
} |
41
|
|
|
|
42
|
|
|
if ($method) { |
43
|
|
|
$qb->andWhere('a.method = :method') |
44
|
|
|
->setParameter(':method', $workerName); |
45
|
|
|
} |
46
|
|
|
|
47
|
|
|
$count = $qb->setParameter(':status', $status) |
48
|
|
|
->getQuery()->getSingleScalarResult(); |
49
|
|
|
|
50
|
|
|
if (!$count) { |
51
|
|
|
return 0; |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
return $count; |
55
|
|
|
} |
56
|
|
|
|
57
|
|
View Code Duplication |
public function pruneErroneousJobs($workerName = null, $method = null) |
|
|
|
|
58
|
|
|
{ |
59
|
|
|
/** @var EntityManager $objectManager */ |
60
|
|
|
$objectManager = $this->getObjectManager(); |
61
|
|
|
$qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j'); |
62
|
|
|
$qb = $qb |
63
|
|
|
->where('j.status = :status') |
64
|
|
|
->setParameter(':status', Job::STATUS_ERROR); |
65
|
|
|
|
66
|
|
|
if ($workerName) { |
67
|
|
|
$qb->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName); |
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
if ($method) { |
71
|
|
|
$qb->andWhere('j.method = :method')->setParameter(':method', $method); |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
$query = $qb->getQuery(); |
75
|
|
|
|
76
|
|
|
return $query->execute(); |
77
|
|
|
} |
78
|
|
|
|
79
|
|
View Code Duplication |
public function pruneExpiredJobs($workerName = null, $method = null) |
|
|
|
|
80
|
|
|
{ |
81
|
|
|
/** @var EntityManager $objectManager */ |
82
|
|
|
$objectManager = $this->getObjectManager(); |
83
|
|
|
$qb = $objectManager->createQueryBuilder()->delete($this->getObjectName(), 'j'); |
84
|
|
|
$qb = $qb |
85
|
|
|
->where('j.expiresAt <= :expiresAt') |
86
|
|
|
->setParameter(':expiresAt', new \DateTime()); |
87
|
|
|
|
88
|
|
|
if ($workerName) { |
89
|
|
|
$qb->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName); |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
if ($method) { |
93
|
|
|
$qb->andWhere('j.method = :method')->setParameter(':method', $method); |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
$query = $qb->getQuery(); |
97
|
|
|
|
98
|
|
|
return $query->execute(); |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
/** |
102
|
|
|
* Removes archived jobs older than $olderThan. |
103
|
|
|
* |
104
|
|
|
* @param \DateTime $olderThan |
105
|
|
|
*/ |
106
|
|
|
public function pruneArchivedJobs(\DateTime $olderThan) |
107
|
|
|
{ |
108
|
|
|
/** @var EntityManager $objectManager */ |
109
|
|
|
$objectManager = $this->getObjectManager(); |
110
|
|
|
$qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j'); |
111
|
|
|
$qb = $qb |
112
|
|
|
->where('j.updatedAt < :updatedAt') |
113
|
|
|
->setParameter(':updatedAt', $olderThan); |
114
|
|
|
|
115
|
|
|
$query = $qb->getQuery(); |
116
|
|
|
|
117
|
|
|
return $query->execute(); |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
public function getJobCount($workerName = null, $method = null) |
121
|
|
|
{ |
122
|
|
|
/** @var EntityManager $objectManager */ |
123
|
|
|
$objectManager = $this->getObjectManager(); |
124
|
|
|
$qb = $objectManager->createQueryBuilder('j'); |
|
|
|
|
125
|
|
|
|
126
|
|
|
$qb = $qb->select('count(j)')->from($this->getObjectName(), 'j'); |
127
|
|
|
|
128
|
|
|
$where = 'where'; |
129
|
|
|
if ($workerName) { |
130
|
|
|
if ($method) { |
131
|
|
|
$qb->where($qb->expr()->andX( |
132
|
|
|
$qb->expr()->eq('j.workerName', ':workerName'), |
133
|
|
|
$qb->expr()->eq('j.method', ':method') |
134
|
|
|
)) |
135
|
|
|
->setParameter(':method', $method); |
136
|
|
|
} else { |
137
|
|
|
$qb->where('j.workerName = :workerName'); |
138
|
|
|
} |
139
|
|
|
$qb->setParameter(':workerName', $workerName); |
140
|
|
|
$where = 'andWhere'; |
141
|
|
|
} elseif ($method) { |
142
|
|
|
$qb->where('j.method = :method')->setParameter(':method', $method); |
143
|
|
|
$where = 'andWhere'; |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
$dateTime = new \DateTime(); |
147
|
|
|
// Filter |
148
|
|
|
$qb |
149
|
|
|
->$where($qb->expr()->orX( |
150
|
|
|
$qb->expr()->isNull('j.whenAt'), |
151
|
|
|
$qb->expr()->lte('j.whenAt', ':whenAt') |
152
|
|
|
)) |
153
|
|
|
->andWhere($qb->expr()->orX( |
154
|
|
|
$qb->expr()->isNull('j.expiresAt'), |
155
|
|
|
$qb->expr()->gt('j.expiresAt', ':expiresAt') |
156
|
|
|
)) |
157
|
|
|
->andWhere('j.locked is NULL') |
158
|
|
|
->setParameter(':whenAt', $dateTime) |
159
|
|
|
->setParameter(':expiresAt', $dateTime); |
160
|
|
|
|
161
|
|
|
$query = $qb->getQuery(); |
162
|
|
|
|
163
|
|
|
return $query->getSingleScalarResult(); |
164
|
|
|
} |
165
|
|
|
|
166
|
|
|
/** |
167
|
|
|
* Get Jobs statuses. |
168
|
|
|
*/ |
169
|
|
|
public function getStatus() |
170
|
|
|
{ |
171
|
|
|
$result = []; |
172
|
|
|
$this->getStatusByEntityName($this->getObjectName(), $result); |
173
|
|
|
$this->getStatusByEntityName($this->getObjectName(), $result); |
174
|
|
|
|
175
|
|
|
$finalResult = []; |
176
|
|
|
foreach ($result as $key => $item) { |
177
|
|
|
ksort($item); |
178
|
|
|
$finalResult[$key] = $item; |
179
|
|
|
} |
180
|
|
|
|
181
|
|
|
return $finalResult; |
182
|
|
|
} |
183
|
|
|
|
184
|
|
|
protected function getStatusByEntityName($entityName, array &$result) |
185
|
|
|
{ |
186
|
|
|
/** @var EntityManager $objectManager */ |
187
|
|
|
$objectManager = $this->getObjectManager(); |
188
|
|
|
$result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c') |
189
|
|
|
->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult(); |
190
|
|
|
|
191
|
|
|
foreach ($result1 as $item) { |
192
|
|
|
$method = $item['workerName'].'->'.$item['method']; |
193
|
|
|
if (!isset($result[$method])) { |
194
|
|
|
$result[$method] = [Job::STATUS_NEW => 0, |
195
|
|
|
Job::STATUS_RUNNING => 0, |
196
|
|
|
Job::STATUS_SUCCESS => 0, |
197
|
|
|
Job::STATUS_ERROR => 0, ]; |
198
|
|
|
} |
199
|
|
|
$result[$method][$item['status']] += intval($item['c']); |
200
|
|
|
} |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
/** |
204
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
205
|
|
|
* |
206
|
|
|
* @param string $workerName |
207
|
|
|
* @param string $methodName |
208
|
|
|
* @param bool $prioritize |
209
|
|
|
* |
210
|
|
|
* @return \Dtc\QueueBundle\Model\Job|null |
211
|
|
|
*/ |
212
|
|
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
213
|
|
|
{ |
214
|
|
|
$uniqid = uniqid(gethostname().'-'.getmypid(), true); |
215
|
|
|
$hash = hash('sha256', $uniqid); |
216
|
|
|
|
217
|
|
|
/** @var EntityManager $objectManager */ |
218
|
|
|
$objectManager = $this->getObjectManager(); |
219
|
|
|
|
220
|
|
|
$objectManager->beginTransaction(); |
221
|
|
|
|
222
|
|
|
/** @var EntityRepository $repository */ |
223
|
|
|
$repository = $this->getRepository(); |
224
|
|
|
$qb = $repository->createQueryBuilder('j'); |
225
|
|
|
$dateTime = new \DateTime(); |
226
|
|
|
$qb |
227
|
|
|
->select('j') |
228
|
|
|
->where('j.status = :status')->setParameter(':status', Job::STATUS_NEW) |
229
|
|
|
->andWhere('j.locked is NULL') |
230
|
|
|
->andWhere($qb->expr()->orX( |
231
|
|
|
$qb->expr()->isNull('j.whenAt'), |
232
|
|
|
$qb->expr()->lte('j.whenAt', ':whenAt') |
233
|
|
|
)) |
234
|
|
|
->andWhere($qb->expr()->orX( |
235
|
|
|
$qb->expr()->isNull('j.expiresAt'), |
236
|
|
|
$qb->expr()->gt('j.expiresAt', ':expiresAt') |
237
|
|
|
)) |
238
|
|
|
->setParameter(':whenAt', $dateTime) |
239
|
|
|
->setParameter(':expiresAt', $dateTime); |
240
|
|
|
|
241
|
|
|
if ($workerName) { |
|
|
|
|
242
|
|
|
$qb->andWhere('j.workerName = :workerName') |
243
|
|
|
->setParameter(':workerName', $workerName); |
244
|
|
|
} |
245
|
|
|
|
246
|
|
|
if ($methodName) { |
|
|
|
|
247
|
|
|
$qb->andWhere('j.method = :method') |
248
|
|
|
->setParameter(':method', $methodName); |
249
|
|
|
} |
250
|
|
|
|
251
|
|
|
if ($prioritize) { |
252
|
|
|
$qb->add('orderBy', 'j.priority ASC, j.whenAt ASC'); |
|
|
|
|
253
|
|
|
} else { |
254
|
|
|
$qb->orderBy('j.whenAt', 'ASC'); |
255
|
|
|
} |
256
|
|
|
$qb->setMaxResults(1); |
257
|
|
|
|
258
|
|
|
/** @var QueryBuilder $qb */ |
259
|
|
|
$query = $qb->getQuery(); |
260
|
|
|
$query->setLockMode(LockMode::PESSIMISTIC_WRITE); |
261
|
|
|
$jobs = $query->getResult(); |
262
|
|
|
|
263
|
|
|
if ($jobs) { |
264
|
|
|
/** @var Job $job */ |
265
|
|
|
$job = $jobs[0]; |
266
|
|
|
if (!$job) { |
267
|
|
|
throw new \Exception("No job found for $hash, even though last result was count ".count($jobs)); |
268
|
|
|
} |
269
|
|
|
$job->setLocked(true); |
270
|
|
|
$job->setLockedAt(new \DateTime()); |
271
|
|
|
$job->setStatus(Job::STATUS_RUNNING); |
272
|
|
|
$job->setRunId($runId); |
273
|
|
|
$objectManager->commit(); |
274
|
|
|
$objectManager->flush(); |
275
|
|
|
|
276
|
|
|
return $job; |
277
|
|
|
} |
278
|
|
|
|
279
|
|
|
$objectManager->rollback(); |
280
|
|
|
|
281
|
|
|
return null; |
282
|
|
|
} |
283
|
|
|
} |
284
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.