Passed
Push — master ( 4d8734...991786 )
by Matthew
09:35 queued 06:44
created

JobManager::takeJob()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 3.0015

Importance

Changes 0
Metric Value
cc 3
eloc 19
nc 4
nop 2
dl 0
loc 27
ccs 17
cts 18
cp 0.9444
crap 3.0015
rs 9.6333
c 0
b 0
f 0
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
14
class JobManager extends DoctrineJobManager
15
{
16
    use CommonTrait;
0 ignored issues
show
introduced by
The trait Dtc\QueueBundle\ORM\CommonTrait requires some properties which are not provided by Dtc\QueueBundle\ORM\JobManager: $generatorType, $idGenerator
Loading history...
17
    protected static $saveInsertCalled = null;
18
    protected static $resetInsertCalled = null;
19
20 31
    public function getObjectManager()
21
    {
22 31
        return $this->getObjectManagerReset();
23
    }
24
25 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
26
    {
27
        /** @var EntityManager $objectManager */
28 3
        $objectManager = $this->getObjectManager();
29
30
        $queryBuilder = $objectManager
31 3
            ->createQueryBuilder()
32 3
            ->select('count(a.id)')
33 3
            ->from($objectName, 'a')
34 3
            ->where('a.status = :status');
35
36 3
        if (null !== $workerName) {
37 1
            $queryBuilder->andWhere('a.workerName = :workerName')
38 1
                ->setParameter('workerName', $workerName);
39
        }
40
41 3
        if (null !== $method) {
42 1
            $queryBuilder->andWhere('a.method = :method')
43 1
                ->setParameter('method', $workerName);
44
        }
45
46 3
        $count = $queryBuilder->setParameter('status', $status)
47 3
            ->getQuery()->getSingleScalarResult();
48
49 3
        if (!$count) {
50 1
            return 0;
51
        }
52
53 3
        return $count;
54
    }
55
56
    /**
57
     * @param string|null $workerName
58
     * @param string|null $method
59
     *
60
     * @return int Count of jobs pruned
61
     */
62 1
    public function pruneExceptionJobs($workerName = null, $method = null)
63
    {
64
        /** @var EntityManager $objectManager */
65 1
        $objectManager = $this->getObjectManager();
66 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
67 1
        $queryBuilder->where('j.status = :status')
68 1
            ->setParameter('status', BaseJob::STATUS_EXCEPTION);
69
70 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
71 1
        $query = $queryBuilder->getQuery();
72
73 1
        return intval($query->execute());
74
    }
75
76
    /**
77
     * @param string $workerName
78
     * @param string $method
79
     */
80 17
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
81
    {
82 17
        if (null !== $workerName) {
83 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter('workerName', $workerName);
84
        }
85
86 17
        if (null !== $method) {
87 4
            $queryBuilder->andWhere('j.method = :method')->setParameter('method', $method);
88
        }
89 17
    }
90
91 1
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var EntityManager $objectManager */
94 1
        $objectManager = $this->getObjectManager();
95 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
96 1
        $queryBuilder->set('j.status', ':newStatus');
97 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
98 1
            ->setParameter('expiresAt', Util::getMicrotimeDateTime());
99 1
        $queryBuilder->andWhere('j.status = :status')
100 1
            ->setParameter('status', BaseJob::STATUS_NEW)
101 1
            ->setParameter('newStatus', Job::STATUS_EXPIRED);
102
103 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
104 1
        $query = $queryBuilder->getQuery();
105
106 1
        return intval($query->execute());
107
    }
108
109
    /**
110
     * Removes archived jobs older than $olderThan.
111
     *
112
     * @param \DateTime $olderThan
113
     */
114 1
    public function pruneArchivedJobs(\DateTime $olderThan)
115
    {
116 1
        return $this->removeOlderThan(
117 1
            $this->getJobArchiveClass(),
118 1
                'updatedAt',
119 1
                $olderThan
120
        );
121
    }
122
123 2
    public function getWaitingJobCount($workerName = null, $method = null)
124
    {
125
        /** @var EntityManager $objectManager */
126 2
        $objectManager = $this->getObjectManager();
127 2
        $queryBuilder = $objectManager->createQueryBuilder();
128
129 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
130
131 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
132 2
        $this->addStandardPredicate($queryBuilder);
133
134 2
        $query = $queryBuilder->getQuery();
135
136 2
        return $query->getSingleScalarResult();
137
    }
138
139
    /**
140
     * @param string $workerName
141
     * @param string $methodName
142
     */
143 2
    public function countLiveJobs($workerName = null, $methodName = null)
144
    {
145
        /** @var EntityRepository $repository */
146 2
        $repository = $this->getRepository();
147 2
        $queryBuilder = $repository->createQueryBuilder('j');
148 2
        $this->addStandardPredicate($queryBuilder);
149 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
150 2
        $queryBuilder->select('count(j.id)');
151
152 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
153
    }
154
155
    /**
156
     * Get Jobs statuses.
157
     */
158 3
    public function getStatus()
159
    {
160 3
        $result = [];
161 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
162 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
163
164 3
        $finalResult = [];
165 3
        foreach ($result as $key => $item) {
166 1
            ksort($item);
167 1
            foreach ($item as $status => $count) {
168 1
                if (isset($finalResult[$key][$status])) {
169
                    $finalResult[$key][$status] += $count;
170
                } else {
171 1
                    $finalResult[$key][$status] = $count;
172
                }
173
            }
174
        }
175
176 3
        return $finalResult;
177
    }
178
179
    /**
180
     * @param string $entityName
181
     */
182 3
    protected function getStatusByEntityName($entityName, array &$result)
183
    {
184
        /** @var EntityManager $objectManager */
185 3
        $objectManager = $this->getObjectManager();
186 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
187 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
188
189 3
        foreach ($result1 as $item) {
190 1
            $method = $item['workerName'].'->'.$item['method'].'()';
191 1
            if (!isset($result[$method])) {
192 1
                $result[$method] = static::getAllStatuses();
193
            }
194 1
            $result[$method][$item['status']] += intval($item['c']);
195
        }
196 3
    }
197
198
    /**
199
     * Get the next job to run (can be filtered by workername and method name).
200
     *
201
     * @param string $workerName
202
     * @param string $methodName
203
     * @param bool   $prioritize
204
     * @param int    $runId
205
     *
206
     * @return Job|null
207
     */
208 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
209
    {
210
        do {
211 14
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
212 14
            $queryBuilder->select('j.id');
213 14
            $queryBuilder->setMaxResults(100);
214
215
            /** @var QueryBuilder $queryBuilder */
216 14
            $query = $queryBuilder->getQuery();
217 14
            $jobs = $query->getResult();
218 14
            if (!empty($jobs)) {
219 11
                foreach ($jobs as $job) {
220 11
                    if ($job = $this->takeJob($job['id'], $runId)) {
221 11
                        return $job;
222
                    }
223
                }
224
            }
225 9
        } while (!empty($jobs));
226
227 9
        return null;
228
    }
229
230
    /**
231
     * @param string|null $workerName
232
     * @param string|null $methodName
233
     * @param bool        $prioritize
234
     *
235
     * @return QueryBuilder
236
     */
237 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
238
    {
239
        /** @var EntityRepository $repository */
240 14
        $repository = $this->getRepository();
241 14
        $queryBuilder = $repository->createQueryBuilder('j');
242 14
        $this->addStandardPredicate($queryBuilder);
243 14
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
244
245 14
        if ($prioritize) {
246 14
            $queryBuilder->addOrderBy('j.priority', 'DESC');
247 14
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
248
        } else {
249 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
250
        }
251
252 14
        return $queryBuilder;
253
    }
254
255 17
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
256
    {
257 17
        $dateTime = Util::getMicrotimeDateTime();
258 17
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
259
260
        $queryBuilder
261 17
            ->where('j.status = :status')->setParameter('status', $status)
262 17
            ->andWhere($queryBuilder->expr()->orX(
263 17
                $queryBuilder->expr()->isNull('j.whenUs'),
264 17
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
265
            ))
266 17
            ->andWhere($queryBuilder->expr()->orX(
267 17
                $queryBuilder->expr()->isNull('j.expiresAt'),
268 17
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
269
            ))
270 17
            ->setParameter('whenUs', $decimal)
271 17
            ->setParameter('expiresAt', $dateTime);
272 17
    }
273
274
    /**
275
     * @param int $runId
276
     */
277 11
    protected function takeJob($jobId, $runId = null)
278
    {
279
        /** @var EntityRepository $repository */
280 11
        $repository = $this->getRepository();
281
        /** @var QueryBuilder $queryBuilder */
282 11
        $queryBuilder = $repository->createQueryBuilder('j');
283
        $queryBuilder
284 11
            ->update()
285 11
            ->set('j.status', ':status')
286 11
            ->setParameter('status', BaseJob::STATUS_RUNNING);
287 11
        if (null !== $runId) {
288
            $queryBuilder
289 1
                ->set('j.runId', ':runId')
290 1
                ->setParameter('runId', $runId);
291
        }
292 11
        $queryBuilder->set('j.startedAt', ':startedAt')
293 11
            ->setParameter('startedAt', Util::getMicrotimeDateTime());
294 11
        $queryBuilder->where('j.id = :id');
295 11
        $queryBuilder->setParameter('id', $jobId);
296 11
        $queryBuilder->andWhere('j.status = :statusNew')->setParameter('statusNew', BaseJob::STATUS_NEW);
297 11
        $resultCount = $queryBuilder->getQuery()->execute();
298
299 11
        if (1 === $resultCount) {
300 11
            return $this->findRefresh($jobId);
301
        }
302
303
        return null;
304
    }
305
306 11
    protected function findRefresh($id)
307
    {
308
        /** @var EntityManager $entityManager */
309 11
        $entityManager = $this->getObjectManager();
310 11
        if (($job = $entityManager->getUnitOfWork()->tryGetById(['id' => $id], $this->getJobClass())) instanceof Job) {
311 11
            $entityManager->refresh($job);
312
313 11
            return $job;
314
        }
315
316
        return $this->getRepository()->find($id);
317
    }
318
319
    /**
320
     * Tries to update the nearest job as a batch.
321
     *
322
     * @param \Dtc\QueueBundle\Model\Job $job
323
     *
324
     * @return null|Job
325
     */
326 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
327
    {
328 1
        if (!$job instanceof Job) {
329
            throw new UnsupportedException('$job must be instance of '.Job::class);
330
        }
331
332
        /** @var QueryBuilder $queryBuilder */
333 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
334 1
        $queryBuilder->select()
335 1
            ->where('j.crcHash = :crcHash')
336 1
            ->andWhere('j.status = :status')
337 1
            ->setParameter('status', BaseJob::STATUS_NEW)
338 1
            ->setParameter('crcHash', $job->getCrcHash())
339 1
            ->orderBy('j.whenUs', 'ASC')
340 1
            ->setMaxResults(1);
341 1
        $existingJobs = $queryBuilder->getQuery()->execute();
342
343 1
        if (empty($existingJobs)) {
344
            return null;
345
        }
346
347
        /** @var Job $existingJob */
348 1
        $existingJob = $existingJobs[0];
349
350 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
351 1
        $newWhenUs = $existingJob->getWhenUs();
352 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
353 1
        if ($bcResult < 0) {
354 1
            $newWhenUs = $job->getWhenUs();
355
        }
356
357 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
358
359 1
        return $existingJob;
360
    }
361
362
    /**
363
     * @param int    $newPriority
364
     * @param string $newWhenUs
365
     */
366 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
367
    {
368 1
        $existingPriority = $existingJob->getPriority();
369 1
        $existingWhenUs = $existingJob->getWhenUs();
370
371 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
372
            /** @var EntityRepository $repository */
373 1
            $repository = $this->getRepository();
374
            /** @var QueryBuilder $queryBuilder */
375 1
            $queryBuilder = $repository->createQueryBuilder('j');
376 1
            $queryBuilder->update();
377 1
            if ($newPriority !== $existingPriority) {
378 1
                $existingJob->setPriority($newPriority);
379 1
                $queryBuilder->set('j.priority', ':priority')
380 1
                    ->setParameter('priority', $newPriority);
381
            }
382 1
            if ($newWhenUs !== $existingWhenUs) {
383 1
                $existingJob->setWhenUs($newWhenUs);
384 1
                $queryBuilder->set('j.whenUs', ':whenUs')
385 1
                    ->setParameter('whenUs', $newWhenUs);
386
            }
387 1
            $queryBuilder->where('j.id = :id');
388 1
            $queryBuilder->setParameter('id', $existingJob->getId());
389 1
            $queryBuilder->getQuery()->execute();
390
        }
391
392 1
        return $existingJob;
393
    }
394
395 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
396
    {
397
        /** @var EntityRepository $repository */
398 2
        $repository = $this->getRepository();
399 2
        $queryBuilder = $repository->createQueryBuilder('j');
400 2
        $this->addStandardPredicate($queryBuilder, $status);
401
        $queryBuilder
402 2
            ->select('DISTINCT j.workerName, j.method');
403
404 2
        $results = $queryBuilder->getQuery()->getArrayResult();
405 2
        if (empty($results)) {
406 2
            return [];
407
        }
408
        $workerMethods = [];
409
        foreach ($results as $result) {
410
            $workerMethods[$result['workerName']][] = $result['method'];
411
        }
412
413
        return $workerMethods;
414
    }
415
416
    /**
417
     * @param string        $workerName
418
     * @param string        $methodName
419
     * @param callable|null $progressCallback
420
     */
421 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
422
    {
423
        // First mark all Live non-running jobs as Archive
424 1
        $repository = $this->getRepository();
425
        /** @var QueryBuilder $queryBuilder */
426 1
        $queryBuilder = $repository->createQueryBuilder('j');
427 1
        $queryBuilder->update($this->getJobClass(), 'j')
428 1
            ->set('j.status', ':statusArchive')
429 1
            ->setParameter('statusArchive', Job::STATUS_ARCHIVE);
430 1
        $this->addStandardPredicate($queryBuilder);
431 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
432 1
        $resultCount = $queryBuilder->getQuery()->execute();
433
434 1
        if ($resultCount) {
435 1
            $this->runArchive($progressCallback);
436
        }
437 1
    }
438
439
    /**
440
     * Move jobs in 'archive' status to the archive table.
441
     *
442
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
443
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
444
     *
445
     * @param callable|null $progressCallback
446
     */
447 1
    protected function runArchive(callable $progressCallback = null)
448
    {
449
        /** @var EntityManager $entityManager */
450 1
        $entityManager = $this->getObjectManager();
451 1
        $count = 0;
452
        do {
453
            /** @var EntityRepository $repository */
454 1
            $repository = $this->getRepository();
455 1
            $queryBuilder = $repository->createQueryBuilder('j');
456 1
            $queryBuilder->where('j.status = :status')
457 1
                ->setParameter('status', Job::STATUS_ARCHIVE)
458 1
                ->setMaxResults(10000);
459
460 1
            $results = $queryBuilder->getQuery()->getArrayResult();
461 1
            foreach ($results as $jobRow) {
462 1
                $job = $repository->find($jobRow['id']);
463 1
                if ($job) {
464 1
                    $entityManager->remove($job);
465
                }
466 1
                ++$count;
467 1
                if (0 == $count % 10) {
468
                    $this->flush();
469 1
                    $this->updateProgress($progressCallback, $count);
470
                }
471
            }
472 1
            $this->flush();
473 1
            $this->updateProgress($progressCallback, $count);
474 1
        } while (!empty($results) && 10000 == count($results));
475 1
    }
476
}
477