1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\ORM; |
4
|
|
|
|
5
|
|
|
use Doctrine\DBAL\LockMode; |
6
|
|
|
use Doctrine\ORM\EntityManager; |
7
|
|
|
use Doctrine\ORM\EntityRepository; |
8
|
|
|
use Doctrine\ORM\Id\AssignedGenerator; |
9
|
|
|
use Doctrine\ORM\Mapping\ClassMetadata; |
10
|
|
|
use Doctrine\ORM\QueryBuilder; |
11
|
|
|
use Dtc\QueueBundle\Doctrine\BaseJobManager; |
12
|
|
|
use Dtc\QueueBundle\Entity\Job; |
13
|
|
|
|
14
|
|
|
class JobManager extends BaseJobManager |
15
|
|
|
{ |
16
|
|
|
public function stopIdGenerator($objectName) |
17
|
|
|
{ |
18
|
|
|
$objectManager = $this->getObjectManager(); |
19
|
|
|
$repository = $objectManager->getRepository($objectName); |
20
|
|
|
/** @var ClassMetadata $metadata */ |
21
|
|
|
$metadata = $objectManager->getClassMetadata($repository->getClassName()); |
22
|
|
|
$metadata->setIdGeneratorType(ClassMetadata::GENERATOR_TYPE_NONE); |
23
|
|
|
$metadata->setIdGenerator(new AssignedGenerator()); |
24
|
|
|
} |
25
|
|
|
|
26
|
|
|
public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
27
|
|
|
{ |
28
|
|
|
/** @var EntityManager $objectManager */ |
29
|
|
|
$objectManager = $this->getObjectManager(); |
30
|
|
|
|
31
|
|
|
$qb = $objectManager |
32
|
|
|
->createQueryBuilder() |
33
|
|
|
->select('count(a.id)') |
34
|
|
|
->from($objectName, 'a') |
35
|
|
|
->where('a.status = :status'); |
36
|
|
|
|
37
|
|
|
if (null !== $workerName) { |
38
|
|
|
$qb->andWhere('a.workerName = :workerName') |
39
|
|
|
->setParameter(':workerName', $workerName); |
40
|
|
|
} |
41
|
|
|
|
42
|
|
|
if (null !== $method) { |
43
|
|
|
$qb->andWhere('a.method = :method') |
44
|
|
|
->setParameter(':method', $workerName); |
45
|
|
|
} |
46
|
|
|
|
47
|
|
|
$count = $qb->setParameter(':status', $status) |
48
|
|
|
->getQuery()->getSingleScalarResult(); |
49
|
|
|
|
50
|
|
|
if (!$count) { |
51
|
|
|
return 0; |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
return $count; |
55
|
|
|
} |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @param string $workerName |
59
|
|
|
* @param string $method |
60
|
|
|
* |
61
|
|
|
* @return int Count of jobs pruned |
62
|
|
|
*/ |
63
|
|
|
public function pruneErroneousJobs($workerName = null, $method = null) |
64
|
|
|
{ |
65
|
|
|
return $this->pruneJobs($workerName, $method, $this->getArchiveObjectName(), function ($qb) { |
|
|
|
|
66
|
|
|
/* @var QueryBuilder $qb */ |
67
|
|
|
$qb->where('j.status = :status') |
68
|
|
|
->setParameter(':status', Job::STATUS_ERROR); |
69
|
|
|
}); |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* Prunes jobs according to a condition function. |
74
|
|
|
* |
75
|
|
|
* @param null $workerName |
76
|
|
|
* @param null $method |
77
|
|
|
* @param $conditionFunc |
78
|
|
|
* |
79
|
|
|
* @return int Count of jobs pruned |
80
|
|
|
*/ |
81
|
|
|
protected function pruneJobs($workerName = null, $method = null, $objectName, $conditionFunc) |
82
|
|
|
{ |
83
|
|
|
/** @var EntityManager $objectManager */ |
84
|
|
|
$objectManager = $this->getObjectManager(); |
85
|
|
|
$qb = $objectManager->createQueryBuilder()->delete($objectName, 'j'); |
86
|
|
|
$conditionFunc($qb); |
87
|
|
|
|
88
|
|
|
if (null !== $workerName) { |
89
|
|
|
$qb->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName); |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
if (null !== $method) { |
93
|
|
|
$qb->andWhere('j.method = :method')->setParameter(':method', $method); |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
$query = $qb->getQuery(); |
97
|
|
|
|
98
|
|
|
return intval($query->execute()); |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
/** |
102
|
|
|
* @param string $workerName |
103
|
|
|
* @param string $method |
104
|
|
|
* |
105
|
|
|
* @return int Count of jobs pruned |
106
|
|
|
*/ |
107
|
|
|
public function pruneExpiredJobs($workerName = null, $method = null) |
108
|
|
|
{ |
109
|
|
|
return $this->pruneJobs($workerName, $method, $this->getObjectName(), function ($qb) { |
|
|
|
|
110
|
|
|
/* @var QueryBuilder $qb */ |
111
|
|
|
$qb->where('j.expiresAt <= :expiresAt') |
112
|
|
|
->setParameter(':expiresAt', new \DateTime()); |
113
|
|
|
}); |
114
|
|
|
} |
115
|
|
|
|
116
|
|
|
/** |
117
|
|
|
* Removes archived jobs older than $olderThan. |
118
|
|
|
* |
119
|
|
|
* @param \DateTime $olderThan |
120
|
|
|
*/ |
121
|
|
|
public function pruneArchivedJobs(\DateTime $olderThan) |
122
|
|
|
{ |
123
|
|
|
/** @var EntityManager $objectManager */ |
124
|
|
|
$objectManager = $this->getObjectManager(); |
125
|
|
|
$qb = $objectManager->createQueryBuilder()->delete($this->getArchiveObjectName(), 'j'); |
126
|
|
|
$qb = $qb |
127
|
|
|
->where('j.updatedAt < :updatedAt') |
128
|
|
|
->setParameter(':updatedAt', $olderThan); |
129
|
|
|
|
130
|
|
|
$query = $qb->getQuery(); |
131
|
|
|
|
132
|
|
|
return $query->execute(); |
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
public function getJobCount($workerName = null, $method = null) |
136
|
|
|
{ |
137
|
|
|
/** @var EntityManager $objectManager */ |
138
|
|
|
$objectManager = $this->getObjectManager(); |
139
|
|
|
$qb = $objectManager->createQueryBuilder(); |
140
|
|
|
|
141
|
|
|
$qb = $qb->select('count(j)')->from($this->getObjectName(), 'j'); |
142
|
|
|
|
143
|
|
|
$where = 'where'; |
144
|
|
|
if (null !== $workerName) { |
145
|
|
|
if (null !== $method) { |
146
|
|
|
$qb->where($qb->expr()->andX( |
147
|
|
|
$qb->expr()->eq('j.workerName', ':workerName'), |
148
|
|
|
$qb->expr()->eq('j.method', ':method') |
149
|
|
|
)) |
150
|
|
|
->setParameter(':method', $method); |
151
|
|
|
} else { |
152
|
|
|
$qb->where('j.workerName = :workerName'); |
153
|
|
|
} |
154
|
|
|
$qb->setParameter(':workerName', $workerName); |
155
|
|
|
$where = 'andWhere'; |
156
|
|
|
} elseif (null !== $method) { |
157
|
|
|
$qb->where('j.method = :method')->setParameter(':method', $method); |
158
|
|
|
$where = 'andWhere'; |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
$dateTime = new \DateTime(); |
162
|
|
|
// Filter |
163
|
|
|
$qb |
164
|
|
|
->$where($qb->expr()->orX( |
165
|
|
|
$qb->expr()->isNull('j.whenAt'), |
166
|
|
|
$qb->expr()->lte('j.whenAt', ':whenAt') |
167
|
|
|
)) |
168
|
|
|
->andWhere($qb->expr()->orX( |
169
|
|
|
$qb->expr()->isNull('j.expiresAt'), |
170
|
|
|
$qb->expr()->gt('j.expiresAt', ':expiresAt') |
171
|
|
|
)) |
172
|
|
|
->andWhere('j.locked is NULL') |
173
|
|
|
->setParameter(':whenAt', $dateTime) |
174
|
|
|
->setParameter(':expiresAt', $dateTime); |
175
|
|
|
|
176
|
|
|
$query = $qb->getQuery(); |
177
|
|
|
|
178
|
|
|
return $query->getSingleScalarResult(); |
179
|
|
|
} |
180
|
|
|
|
181
|
|
|
/** |
182
|
|
|
* Get Jobs statuses. |
183
|
|
|
*/ |
184
|
|
|
public function getStatus() |
185
|
|
|
{ |
186
|
|
|
$result = []; |
187
|
|
|
$this->getStatusByEntityName($this->getObjectName(), $result); |
188
|
|
|
$this->getStatusByEntityName($this->getObjectName(), $result); |
189
|
|
|
|
190
|
|
|
$finalResult = []; |
191
|
|
|
foreach ($result as $key => $item) { |
192
|
|
|
ksort($item); |
193
|
|
|
$finalResult[$key] = $item; |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
return $finalResult; |
197
|
|
|
} |
198
|
|
|
|
199
|
|
|
/** |
200
|
|
|
* @param string $entityName |
201
|
|
|
*/ |
202
|
|
|
protected function getStatusByEntityName($entityName, array &$result) |
203
|
|
|
{ |
204
|
|
|
/** @var EntityManager $objectManager */ |
205
|
|
|
$objectManager = $this->getObjectManager(); |
206
|
|
|
$result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c') |
207
|
|
|
->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult(); |
208
|
|
|
|
209
|
|
|
foreach ($result1 as $item) { |
210
|
|
|
$method = $item['workerName'].'->'.$item['method']; |
211
|
|
|
if (!isset($result[$method])) { |
212
|
|
|
$result[$method] = [Job::STATUS_NEW => 0, |
213
|
|
|
Job::STATUS_RUNNING => 0, |
214
|
|
|
Job::STATUS_SUCCESS => 0, |
215
|
|
|
Job::STATUS_ERROR => 0, ]; |
216
|
|
|
} |
217
|
|
|
$result[$method][$item['status']] += intval($item['c']); |
218
|
|
|
} |
219
|
|
|
} |
220
|
|
|
|
221
|
|
|
/** |
222
|
|
|
* Get the next job to run (can be filtered by workername and method name). |
223
|
|
|
* |
224
|
|
|
* @param string $workerName |
225
|
|
|
* @param string $methodName |
226
|
|
|
* @param bool $prioritize |
227
|
|
|
* |
228
|
|
|
* @return Job|null |
229
|
|
|
*/ |
230
|
|
|
public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
231
|
|
|
{ |
232
|
|
|
$uniqid = uniqid(gethostname().'-'.getmypid(), true); |
233
|
|
|
$hash = hash('sha256', $uniqid); |
234
|
|
|
|
235
|
|
|
/** @var EntityManager $objectManager */ |
236
|
|
|
$objectManager = $this->getObjectManager(); |
237
|
|
|
|
238
|
|
|
$objectManager->beginTransaction(); |
239
|
|
|
|
240
|
|
|
/** @var EntityRepository $repository */ |
241
|
|
|
$repository = $this->getRepository(); |
242
|
|
|
$qb = $repository->createQueryBuilder('j'); |
243
|
|
|
$dateTime = new \DateTime(); |
244
|
|
|
$qb |
245
|
|
|
->select('j') |
246
|
|
|
->where('j.status = :status')->setParameter(':status', Job::STATUS_NEW) |
247
|
|
|
->andWhere('j.locked is NULL') |
248
|
|
|
->andWhere($qb->expr()->orX( |
249
|
|
|
$qb->expr()->isNull('j.whenAt'), |
250
|
|
|
$qb->expr()->lte('j.whenAt', ':whenAt') |
251
|
|
|
)) |
252
|
|
|
->andWhere($qb->expr()->orX( |
253
|
|
|
$qb->expr()->isNull('j.expiresAt'), |
254
|
|
|
$qb->expr()->gt('j.expiresAt', ':expiresAt') |
255
|
|
|
)) |
256
|
|
|
->setParameter(':whenAt', $dateTime) |
257
|
|
|
->setParameter(':expiresAt', $dateTime); |
258
|
|
|
|
259
|
|
|
if (null !== $workerName) { |
260
|
|
|
$qb->andWhere('j.workerName = :workerName') |
261
|
|
|
->setParameter(':workerName', $workerName); |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
if (null !== $methodName) { |
265
|
|
|
$qb->andWhere('j.method = :method') |
266
|
|
|
->setParameter(':method', $methodName); |
267
|
|
|
} |
268
|
|
|
|
269
|
|
|
if ($prioritize) { |
270
|
|
|
$qb->add('orderBy', 'j.priority ASC, j.whenAt ASC'); |
|
|
|
|
271
|
|
|
} else { |
272
|
|
|
$qb->orderBy('j.whenAt', 'ASC'); |
273
|
|
|
} |
274
|
|
|
$qb->setMaxResults(1); |
275
|
|
|
|
276
|
|
|
/** @var QueryBuilder $qb */ |
277
|
|
|
$query = $qb->getQuery(); |
278
|
|
|
$query->setLockMode(LockMode::PESSIMISTIC_WRITE); |
279
|
|
|
$jobs = $query->getResult(); |
280
|
|
|
|
281
|
|
|
if ($jobs) { |
282
|
|
|
/** @var Job $job */ |
283
|
|
|
$job = $jobs[0]; |
284
|
|
|
if (!$job) { |
285
|
|
|
throw new \Exception("No job found for $hash, even though last result was count ".count($jobs)); |
286
|
|
|
} |
287
|
|
|
$job->setLocked(true); |
288
|
|
|
$job->setLockedAt(new \DateTime()); |
289
|
|
|
$job->setStatus(Job::STATUS_RUNNING); |
290
|
|
|
$job->setRunId($runId); |
291
|
|
|
$objectManager->commit(); |
292
|
|
|
$objectManager->flush(); |
293
|
|
|
|
294
|
|
|
return $job; |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
$objectManager->rollback(); |
298
|
|
|
|
299
|
|
|
return null; |
300
|
|
|
} |
301
|
|
|
} |
302
|
|
|
|
This check looks at variables that have been passed in as parameters and are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.