1 | <?php |
||
2 | |||
3 | namespace Dtc\QueueBundle\ORM; |
||
4 | |||
5 | use Doctrine\ORM\EntityManager; |
||
0 ignored issues
–
show
|
|||
6 | use Doctrine\ORM\EntityRepository; |
||
0 ignored issues
–
show
The type
Doctrine\ORM\EntityRepository was not found. Maybe you did not declare it correctly or list all dependencies?
The issue could also be caused by a filter entry in the build configuration.
If the path has been excluded in your configuration, e.g. filter:
dependency_paths: ["lib/*"]
For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths ![]() |
|||
7 | use Doctrine\ORM\QueryBuilder; |
||
0 ignored issues
–
show
The type
Doctrine\ORM\QueryBuilder was not found. Maybe you did not declare it correctly or list all dependencies?
The issue could also be caused by a filter entry in the build configuration.
If the path has been excluded in your configuration, e.g. filter:
dependency_paths: ["lib/*"]
For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths ![]() |
|||
8 | use Dtc\QueueBundle\Doctrine\DoctrineJobManager; |
||
9 | use Dtc\QueueBundle\Entity\Job; |
||
10 | use Dtc\QueueBundle\Exception\UnsupportedException; |
||
11 | use Dtc\QueueBundle\Model\BaseJob; |
||
12 | use Dtc\QueueBundle\Util\Util; |
||
13 | |||
14 | class JobManager extends DoctrineJobManager |
||
15 | { |
||
16 | 1 | use CommonTrait; |
|
0 ignored issues
–
show
|
|||
17 | protected static $saveInsertCalled = null; |
||
18 | protected static $resetInsertCalled = null; |
||
19 | |||
20 | 31 | public function getObjectManager() |
|
21 | { |
||
22 | 31 | return $this->getObjectManagerReset(); |
|
23 | } |
||
24 | |||
25 | 3 | public function countJobsByStatus($objectName, $status, $workerName = null, $method = null) |
|
26 | { |
||
27 | /** @var EntityManager $objectManager */ |
||
28 | 3 | $objectManager = $this->getObjectManager(); |
|
29 | |||
30 | $queryBuilder = $objectManager |
||
31 | 3 | ->createQueryBuilder() |
|
32 | 3 | ->select('count(a.id)') |
|
33 | 3 | ->from($objectName, 'a') |
|
34 | 3 | ->where('a.status = :status'); |
|
35 | |||
36 | 3 | if (null !== $workerName) { |
|
37 | 1 | $queryBuilder->andWhere('a.workerName = :workerName') |
|
38 | 1 | ->setParameter('workerName', $workerName); |
|
39 | } |
||
40 | |||
41 | 3 | if (null !== $method) { |
|
42 | 1 | $queryBuilder->andWhere('a.method = :method') |
|
43 | 1 | ->setParameter('method', $workerName); |
|
44 | } |
||
45 | |||
46 | 3 | $count = $queryBuilder->setParameter('status', $status) |
|
47 | 3 | ->getQuery()->getSingleScalarResult(); |
|
48 | |||
49 | 3 | if (!$count) { |
|
50 | 1 | return 0; |
|
51 | } |
||
52 | |||
53 | 3 | return $count; |
|
54 | } |
||
55 | |||
56 | /** |
||
57 | * @param string|null $workerName |
||
58 | * @param string|null $method |
||
59 | * |
||
60 | * @return int Count of jobs pruned |
||
61 | */ |
||
62 | 1 | public function pruneExceptionJobs($workerName = null, $method = null) |
|
63 | { |
||
64 | /** @var EntityManager $objectManager */ |
||
65 | 1 | $objectManager = $this->getObjectManager(); |
|
66 | 1 | $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j'); |
|
67 | 1 | $queryBuilder->where('j.status = :status') |
|
68 | 1 | ->setParameter('status', BaseJob::STATUS_EXCEPTION); |
|
69 | |||
70 | 1 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
|
71 | 1 | $query = $queryBuilder->getQuery(); |
|
72 | |||
73 | 1 | return intval($query->execute()); |
|
74 | } |
||
75 | |||
76 | /** |
||
77 | * @param string $workerName |
||
78 | * @param string $method |
||
79 | */ |
||
80 | 17 | protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null) |
|
81 | { |
||
82 | 17 | if (null !== $workerName) { |
|
83 | 5 | $queryBuilder->andWhere('j.workerName = :workerName')->setParameter('workerName', $workerName); |
|
84 | } |
||
85 | |||
86 | 17 | if (null !== $method) { |
|
87 | 4 | $queryBuilder->andWhere('j.method = :method')->setParameter('method', $method); |
|
88 | } |
||
89 | 17 | } |
|
90 | |||
91 | 1 | protected function updateExpired($workerName = null, $method = null) |
|
92 | { |
||
93 | /** @var EntityManager $objectManager */ |
||
94 | 1 | $objectManager = $this->getObjectManager(); |
|
95 | 1 | $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j'); |
|
96 | 1 | $queryBuilder->set('j.status', ':newStatus'); |
|
97 | 1 | $queryBuilder->where('j.expiresAt <= :expiresAt') |
|
98 | 1 | ->setParameter('expiresAt', Util::getMicrotimeDateTime()); |
|
99 | 1 | $queryBuilder->andWhere('j.status = :status') |
|
100 | 1 | ->setParameter('status', BaseJob::STATUS_NEW) |
|
101 | 1 | ->setParameter('newStatus', Job::STATUS_EXPIRED); |
|
102 | |||
103 | 1 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
|
104 | 1 | $query = $queryBuilder->getQuery(); |
|
105 | |||
106 | 1 | return intval($query->execute()); |
|
107 | } |
||
108 | |||
109 | /** |
||
110 | * Removes archived jobs older than $olderThan. |
||
111 | */ |
||
112 | 1 | public function pruneArchivedJobs(\DateTime $olderThan) |
|
113 | { |
||
114 | 1 | return $this->removeOlderThan( |
|
115 | 1 | $this->getJobArchiveClass(), |
|
116 | 1 | 'updatedAt', |
|
117 | $olderThan |
||
118 | ); |
||
119 | } |
||
120 | |||
121 | 2 | public function getWaitingJobCount($workerName = null, $method = null) |
|
122 | { |
||
123 | /** @var EntityManager $objectManager */ |
||
124 | 2 | $objectManager = $this->getObjectManager(); |
|
125 | 2 | $queryBuilder = $objectManager->createQueryBuilder(); |
|
126 | |||
127 | 2 | $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j'); |
|
128 | |||
129 | 2 | $this->addStandardPredicates($queryBuilder); |
|
130 | 2 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $method); |
|
131 | |||
132 | 2 | $query = $queryBuilder->getQuery(); |
|
133 | |||
134 | 2 | return $query->getSingleScalarResult(); |
|
135 | } |
||
136 | |||
137 | /** |
||
138 | * @param string $workerName |
||
139 | * @param string $methodName |
||
140 | */ |
||
141 | 2 | public function countLiveJobs($workerName = null, $methodName = null) |
|
142 | { |
||
143 | /** @var EntityRepository $repository */ |
||
144 | 2 | $repository = $this->getRepository(); |
|
145 | 2 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
146 | 2 | $this->addStandardPredicates($queryBuilder); |
|
147 | 2 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
|
148 | 2 | $queryBuilder->select('count(j.id)'); |
|
149 | |||
150 | 2 | return $queryBuilder->getQuery()->getSingleScalarResult(); |
|
151 | } |
||
152 | |||
153 | /** |
||
154 | * Get Jobs statuses. |
||
155 | */ |
||
156 | 3 | public function getStatus(): array |
|
157 | { |
||
158 | 3 | $result = []; |
|
159 | 3 | $this->getStatusByEntityName($this->getJobClass(), $result); |
|
160 | 3 | $this->getStatusByEntityName($this->getJobArchiveClass(), $result); |
|
161 | |||
162 | 3 | $finalResult = []; |
|
163 | 3 | foreach ($result as $key => $item) { |
|
164 | 1 | ksort($item); |
|
165 | 1 | foreach ($item as $status => $count) { |
|
166 | 1 | if (isset($finalResult[$key][$status])) { |
|
167 | $finalResult[$key][$status] += $count; |
||
168 | } else { |
||
169 | 1 | $finalResult[$key][$status] = $count; |
|
170 | } |
||
171 | } |
||
172 | } |
||
173 | |||
174 | 3 | return $finalResult; |
|
175 | } |
||
176 | |||
177 | /** |
||
178 | * @param string $entityName |
||
179 | */ |
||
180 | 3 | protected function getStatusByEntityName($entityName, array &$result) |
|
181 | { |
||
182 | /** @var EntityManager $objectManager */ |
||
183 | 3 | $objectManager = $this->getObjectManager(); |
|
184 | 3 | $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c') |
|
185 | 3 | ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult(); |
|
186 | |||
187 | 3 | foreach ($result1 as $item) { |
|
188 | 1 | $method = $item['workerName'].'->'.$item['method'].'()'; |
|
189 | 1 | if (!isset($result[$method])) { |
|
190 | 1 | $result[$method] = static::getAllStatuses(); |
|
191 | } |
||
192 | 1 | $result[$method][$item['status']] += intval($item['c']); |
|
193 | } |
||
194 | 3 | } |
|
195 | |||
196 | /** |
||
197 | * Get the next job to run (can be filtered by workername and method name). |
||
198 | * |
||
199 | * @param string $workerName |
||
200 | * @param string $methodName |
||
201 | * @param bool $prioritize |
||
202 | * @param int $runId |
||
203 | * |
||
204 | * @return Job|null |
||
205 | */ |
||
206 | 14 | public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
207 | { |
||
208 | do { |
||
209 | 14 | $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize); |
|
210 | 14 | $queryBuilder->select('j.id'); |
|
211 | 14 | $queryBuilder->setMaxResults(100); |
|
212 | |||
213 | /** @var QueryBuilder $queryBuilder */ |
||
214 | 14 | $query = $queryBuilder->getQuery(); |
|
215 | 14 | $jobs = $query->getResult(); |
|
216 | 14 | if (!empty($jobs)) { |
|
217 | 11 | foreach ($jobs as $job) { |
|
218 | 11 | if ($job = $this->takeJob($job['id'], $runId)) { |
|
219 | 11 | return $job; |
|
220 | } |
||
221 | } |
||
222 | } |
||
223 | 11 | } while (!empty($jobs)); |
|
224 | |||
225 | 11 | return null; |
|
226 | } |
||
227 | |||
228 | /** |
||
229 | * @param string|null $workerName |
||
230 | * @param string|null $methodName |
||
231 | * @param bool $prioritize |
||
232 | * |
||
233 | * @return QueryBuilder |
||
234 | */ |
||
235 | 14 | public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true) |
|
236 | { |
||
237 | /** @var EntityRepository $repository */ |
||
238 | 14 | $repository = $this->getRepository(); |
|
239 | 14 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
240 | 14 | $this->addStandardPredicates($queryBuilder); |
|
241 | 14 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
|
242 | |||
243 | 14 | if ($prioritize) { |
|
244 | 14 | $queryBuilder->addOrderBy('j.priority', 'DESC'); |
|
245 | 14 | $queryBuilder->addOrderBy('j.whenUs', 'ASC'); |
|
246 | } else { |
||
247 | 1 | $queryBuilder->orderBy('j.whenUs', 'ASC'); |
|
248 | } |
||
249 | |||
250 | 14 | return $queryBuilder; |
|
251 | } |
||
252 | |||
253 | 17 | protected function addStandardPredicates(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW) |
|
254 | { |
||
255 | 17 | $dateTime = Util::getMicrotimeDateTime(); |
|
256 | 17 | $microtimeInteger = Util::getMicrotimeIntegerFormat($dateTime); |
|
257 | |||
258 | $queryBuilder |
||
259 | 17 | ->andWhere('j.status = :status')->setParameter('status', $status) |
|
260 | 17 | ->andWhere($queryBuilder->expr()->orX( |
|
261 | 17 | $queryBuilder->expr()->isNull('j.whenUs'), |
|
262 | 17 | $queryBuilder->expr()->lte('j.whenUs', ':whenUs') |
|
263 | )) |
||
264 | 17 | ->andWhere($queryBuilder->expr()->orX( |
|
265 | 17 | $queryBuilder->expr()->isNull('j.expiresAt'), |
|
266 | 17 | $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt') |
|
267 | )) |
||
268 | 17 | ->setParameter('whenUs', $microtimeInteger) |
|
269 | 17 | ->setParameter('expiresAt', $dateTime); |
|
270 | 17 | } |
|
271 | |||
272 | /** |
||
273 | * @param int $runId |
||
274 | */ |
||
275 | 11 | protected function takeJob($jobId, $runId = null) |
|
276 | { |
||
277 | /** @var EntityRepository $repository */ |
||
278 | 11 | $repository = $this->getRepository(); |
|
279 | /** @var QueryBuilder $queryBuilder */ |
||
280 | 11 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
281 | $queryBuilder |
||
282 | 11 | ->update() |
|
283 | 11 | ->set('j.status', ':status') |
|
284 | 11 | ->setParameter('status', BaseJob::STATUS_RUNNING); |
|
285 | 11 | if (null !== $runId) { |
|
286 | $queryBuilder |
||
287 | 1 | ->set('j.runId', ':runId') |
|
288 | 1 | ->setParameter('runId', $runId); |
|
289 | } |
||
290 | 11 | $queryBuilder->set('j.startedAt', ':startedAt') |
|
291 | 11 | ->setParameter('startedAt', Util::getMicrotimeDateTime()); |
|
292 | 11 | $queryBuilder->where('j.id = :id'); |
|
293 | 11 | $queryBuilder->setParameter('id', $jobId); |
|
294 | 11 | $queryBuilder->andWhere('j.status = :statusNew')->setParameter('statusNew', BaseJob::STATUS_NEW); |
|
295 | 11 | $resultCount = $queryBuilder->getQuery()->execute(); |
|
296 | |||
297 | 11 | if (1 === $resultCount) { |
|
298 | 11 | return $this->findRefresh($jobId); |
|
299 | } |
||
300 | |||
301 | return null; |
||
302 | } |
||
303 | |||
304 | 11 | protected function findRefresh($id) |
|
305 | { |
||
306 | /** @var EntityManager $entityManager */ |
||
307 | 11 | $entityManager = $this->getObjectManager(); |
|
308 | 11 | if (($job = $entityManager->getUnitOfWork()->tryGetById(['id' => $id], $this->getJobClass())) instanceof Job) { |
|
309 | 11 | $entityManager->refresh($job); |
|
310 | |||
311 | 11 | return $job; |
|
312 | } |
||
313 | |||
314 | 1 | return $this->getRepository()->find($id); |
|
315 | } |
||
316 | |||
317 | /** |
||
318 | * Tries to update the nearest job as a batch. |
||
319 | * |
||
320 | * @return Job|null |
||
321 | */ |
||
322 | 1 | public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job) |
|
323 | { |
||
324 | 1 | if (!$job instanceof Job) { |
|
325 | throw new UnsupportedException('$job must be instance of '.Job::class); |
||
326 | } |
||
327 | |||
328 | /** @var QueryBuilder $queryBuilder */ |
||
329 | 1 | $queryBuilder = $this->getRepository()->createQueryBuilder('j'); |
|
330 | 1 | $queryBuilder->select() |
|
331 | 1 | ->where('j.crcHash = :crcHash') |
|
332 | 1 | ->andWhere('j.status = :status') |
|
333 | 1 | ->setParameter('status', BaseJob::STATUS_NEW) |
|
334 | 1 | ->setParameter('crcHash', $job->getCrcHash()) |
|
335 | 1 | ->orderBy('j.whenUs', 'ASC') |
|
336 | 1 | ->setMaxResults(1); |
|
337 | 1 | $existingJobs = $queryBuilder->getQuery()->execute(); |
|
338 | |||
339 | 1 | if (empty($existingJobs)) { |
|
340 | return null; |
||
341 | } |
||
342 | |||
343 | /** @var Job $existingJob */ |
||
344 | 1 | $existingJob = $existingJobs[0]; |
|
345 | |||
346 | 1 | $newPriority = max($job->getPriority(), $existingJob->getPriority()); |
|
347 | 1 | $newWhenUs = $existingJob->getWhenUs(); |
|
348 | 1 | $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs()); |
|
349 | 1 | if ($bcResult < 0) { |
|
350 | 1 | $newWhenUs = $job->getWhenUs(); |
|
351 | } |
||
352 | |||
353 | 1 | $this->updateBatchJob($existingJob, $newPriority, $newWhenUs); |
|
354 | |||
355 | 1 | return $existingJob; |
|
356 | } |
||
357 | |||
358 | /** |
||
359 | * @param int $newPriority |
||
360 | * @param string $newWhenUs |
||
361 | */ |
||
362 | 1 | protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs) |
|
363 | { |
||
364 | 1 | $existingPriority = $existingJob->getPriority(); |
|
365 | 1 | $existingWhenUs = $existingJob->getWhenUs(); |
|
366 | |||
367 | 1 | if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) { |
|
368 | /** @var EntityRepository $repository */ |
||
369 | 1 | $repository = $this->getRepository(); |
|
370 | /** @var QueryBuilder $queryBuilder */ |
||
371 | 1 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
372 | 1 | $queryBuilder->update(); |
|
373 | 1 | if ($newPriority !== $existingPriority) { |
|
374 | 1 | $existingJob->setPriority($newPriority); |
|
375 | 1 | $queryBuilder->set('j.priority', ':priority') |
|
376 | 1 | ->setParameter('priority', $newPriority); |
|
377 | } |
||
378 | 1 | if ($newWhenUs !== $existingWhenUs) { |
|
379 | 1 | $existingJob->setWhenUs($newWhenUs); |
|
380 | 1 | $queryBuilder->set('j.whenUs', ':whenUs') |
|
381 | 1 | ->setParameter('whenUs', $newWhenUs); |
|
382 | } |
||
383 | 1 | $queryBuilder->where('j.id = :id'); |
|
384 | 1 | $queryBuilder->setParameter('id', $existingJob->getId()); |
|
385 | 1 | $queryBuilder->getQuery()->execute(); |
|
386 | } |
||
387 | |||
388 | 1 | return $existingJob; |
|
389 | } |
||
390 | |||
391 | 2 | public function getWorkersAndMethods($status = BaseJob::STATUS_NEW) |
|
392 | { |
||
393 | /** @var EntityRepository $repository */ |
||
394 | 2 | $repository = $this->getRepository(); |
|
395 | 2 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
396 | 2 | $this->addStandardPredicates($queryBuilder, $status); |
|
397 | $queryBuilder |
||
398 | 2 | ->select('DISTINCT j.workerName, j.method'); |
|
399 | |||
400 | 2 | $results = $queryBuilder->getQuery()->getArrayResult(); |
|
401 | 2 | if (empty($results)) { |
|
402 | 2 | return []; |
|
403 | } |
||
404 | $workerMethods = []; |
||
405 | foreach ($results as $result) { |
||
406 | $workerMethods[$result['workerName']][] = $result['method']; |
||
407 | } |
||
408 | |||
409 | return $workerMethods; |
||
410 | } |
||
411 | |||
412 | /** |
||
413 | * @param string $workerName |
||
414 | * @param string $methodName |
||
415 | */ |
||
416 | 1 | public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null) |
|
417 | { |
||
418 | // First mark all Live non-running jobs as Archive |
||
419 | 1 | $repository = $this->getRepository(); |
|
420 | /** @var QueryBuilder $queryBuilder */ |
||
421 | 1 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
422 | 1 | $queryBuilder->update($this->getJobClass(), 'j') |
|
423 | 1 | ->set('j.status', ':statusArchive') |
|
424 | 1 | ->setParameter('statusArchive', Job::STATUS_ARCHIVE); |
|
425 | 1 | $this->addStandardPredicates($queryBuilder); |
|
426 | 1 | $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName); |
|
427 | 1 | $resultCount = $queryBuilder->getQuery()->execute(); |
|
428 | |||
429 | 1 | if ($resultCount) { |
|
430 | 1 | $this->runArchive($progressCallback); |
|
431 | } |
||
432 | 1 | } |
|
433 | |||
434 | /** |
||
435 | * Move jobs in 'archive' status to the archive table. |
||
436 | * |
||
437 | * This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT |
||
438 | * All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine. |
||
439 | */ |
||
440 | 1 | protected function runArchive(callable $progressCallback = null) |
|
441 | { |
||
442 | /** @var EntityManager $entityManager */ |
||
443 | 1 | $entityManager = $this->getObjectManager(); |
|
444 | 1 | $count = 0; |
|
445 | do { |
||
446 | /** @var EntityRepository $repository */ |
||
447 | 1 | $repository = $this->getRepository(); |
|
448 | 1 | $queryBuilder = $repository->createQueryBuilder('j'); |
|
449 | 1 | $queryBuilder->where('j.status = :status') |
|
450 | 1 | ->setParameter('status', Job::STATUS_ARCHIVE) |
|
451 | 1 | ->setMaxResults(10000); |
|
452 | |||
453 | 1 | $results = $queryBuilder->getQuery()->getArrayResult(); |
|
454 | 1 | foreach ($results as $jobRow) { |
|
455 | 1 | $job = $repository->find($jobRow['id']); |
|
456 | 1 | if ($job) { |
|
457 | 1 | $entityManager->remove($job); |
|
458 | } |
||
459 | 1 | ++$count; |
|
460 | 1 | if (0 == $count % 10) { |
|
461 | $this->flush(); |
||
462 | $this->updateProgress($progressCallback, $count); |
||
463 | } |
||
464 | } |
||
465 | 1 | $this->flush(); |
|
466 | 1 | $this->updateProgress($progressCallback, $count); |
|
467 | 1 | } while (!empty($results) && 10000 == count($results)); |
|
468 | 1 | } |
|
469 | } |
||
470 |
The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g.
excluded_paths: ["lib/*"]
, you can move it to the dependency path list as follows:For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths