Passed
Pull Request — master (#57)
by Matthew
15:03
created

JobManager::updateBatchJob()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 27
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 5

Importance

Changes 0
Metric Value
dl 0
loc 27
ccs 22
cts 22
cp 1
rs 8.439
c 0
b 0
f 0
cc 5
eloc 18
nc 5
nop 3
crap 5
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 31
    public function getObjectManager()
22
    {
23 31
        return $this->getObjectManagerReset();
24
    }
25
26 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
27
    {
28
        /** @var EntityManager $objectManager */
29 3
        $objectManager = $this->getObjectManager();
30
31
        $queryBuilder = $objectManager
32 3
            ->createQueryBuilder()
33 3
            ->select('count(a.id)')
34 3
            ->from($objectName, 'a')
35 3
            ->where('a.status = :status');
36
37 3
        if (null !== $workerName) {
38 1
            $queryBuilder->andWhere('a.workerName = :workerName')
39 1
                ->setParameter('workerName', $workerName);
40 1
        }
41
42 3
        if (null !== $method) {
43 1
            $queryBuilder->andWhere('a.method = :method')
44 1
                ->setParameter('method', $workerName);
45 1
        }
46
47 3
        $count = $queryBuilder->setParameter('status', $status)
48 3
            ->getQuery()->getSingleScalarResult();
49
50 3
        if (!$count) {
51 1
            return 0;
52
        }
53
54 3
        return $count;
55
    }
56
57
    /**
58
     * @param string|null $workerName
59
     * @param string|null $method
60
     *
61
     * @return int Count of jobs pruned
62
     */
63 2
    public function pruneExceptionJobs($workerName = null, $method = null)
64
    {
65
        /** @var EntityManager $objectManager */
66 1
        $objectManager = $this->getObjectManager();
67 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
68 1
        $queryBuilder->where('j.status = :status')
69 1
            ->setParameter('status', BaseJob::STATUS_EXCEPTION);
70
71 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
72 2
        $query = $queryBuilder->getQuery();
73
74 1
        return intval($query->execute());
75
    }
76
77 21
    protected function resetSaveOk($function)
78
    {
79 21
        $objectManager = $this->getObjectManager();
80 21
        $splObjectHash = spl_object_hash($objectManager);
81
82 21
        if ('save' === $function) {
83
            $compare = static::$resetInsertCalled;
84
        } else {
85 21
            $compare = static::$saveInsertCalled;
86
        }
87
88 21
        if ($splObjectHash === $compare) {
89
            // Insert SQL is cached...
90
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
91
            throw new LogicException($msg);
92
        }
93
94 21
        if ('save' === $function) {
95
            static::$saveInsertCalled = spl_object_hash($objectManager);
96
        } else {
97 21
            static::$resetInsertCalled = spl_object_hash($objectManager);
98
        }
99 21
    }
100
101
    /**
102
     * @param string $workerName
103
     * @param string $method
104
     */
105 17
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
106
    {
107 17
        if (null !== $workerName) {
108 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter('workerName', $workerName);
109 5
        }
110
111 17
        if (null !== $method) {
112 4
            $queryBuilder->andWhere('j.method = :method')->setParameter('method', $method);
113 4
        }
114 17
    }
115
116 1
    protected function updateExpired($workerName = null, $method = null)
117
    {
118
        /** @var EntityManager $objectManager */
119 1
        $objectManager = $this->getObjectManager();
120 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
121 1
        $queryBuilder->set('j.status', ':newStatus');
122 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
123 1
            ->setParameter('expiresAt', Util::getMicrotimeDateTime());
124 1
        $queryBuilder->andWhere('j.status = :status')
125 1
            ->setParameter('status', BaseJob::STATUS_NEW)
126 1
            ->setParameter('newStatus', Job::STATUS_EXPIRED);
127
128 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
129 1
        $query = $queryBuilder->getQuery();
130
131 1
        return intval($query->execute());
132
    }
133
134
    /**
135
     * Removes archived jobs older than $olderThan.
136
     *
137
     * @param \DateTime $olderThan
138
     */
139 1
    public function pruneArchivedJobs(\DateTime $olderThan)
140
    {
141 1
        return $this->removeOlderThan(
142 1
            $this->getJobArchiveClass(),
143 1
                'updatedAt',
144
                $olderThan
145 1
        );
146
    }
147
148 2
    public function getWaitingJobCount($workerName = null, $method = null)
149
    {
150
        /** @var EntityManager $objectManager */
151 2
        $objectManager = $this->getObjectManager();
152 2
        $queryBuilder = $objectManager->createQueryBuilder();
153
154 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
155
156 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
157 2
        $this->addStandardPredicate($queryBuilder);
158
159 2
        $query = $queryBuilder->getQuery();
160
161 2
        return $query->getSingleScalarResult();
162
    }
163
164
    /**
165
     * Get Jobs statuses.
166
     */
167 3
    public function getStatus()
168
    {
169 3
        $result = [];
170 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
171 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
172
173 3
        $finalResult = [];
174 3
        foreach ($result as $key => $item) {
175 1
            ksort($item);
176 1
            foreach ($item as $status => $count) {
177 1
                if (isset($finalResult[$key][$status])) {
178
                    $finalResult[$key][$status] += $count;
179
                } else {
180 1
                    $finalResult[$key][$status] = $count;
181
                }
182 1
            }
183 3
        }
184
185 3
        return $finalResult;
186
    }
187
188
    /**
189
     * @param string $entityName
190
     */
191 3
    protected function getStatusByEntityName($entityName, array &$result)
192
    {
193
        /** @var EntityManager $objectManager */
194 3
        $objectManager = $this->getObjectManager();
195 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
196 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
197
198 3
        foreach ($result1 as $item) {
199 1
            $method = $item['workerName'].'->'.$item['method'].'()';
200 1
            if (!isset($result[$method])) {
201 1
                $result[$method] = static::getAllStatuses();
202 1
            }
203 1
            $result[$method][$item['status']] += intval($item['c']);
204 3
        }
205 3
    }
206
207
    /**
208
     * Get the next job to run (can be filtered by workername and method name).
209
     *
210
     * @param string $workerName
211
     * @param string $methodName
212
     * @param bool   $prioritize
213
     * @param int    $runId
214
     *
215
     * @return Job|null
216
     */
217 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
218
    {
219
        do {
220 14
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
221 14
            $queryBuilder->select('j.id');
222 14
            $queryBuilder->setMaxResults(100);
223
224
            /** @var QueryBuilder $queryBuilder */
225 14
            $query = $queryBuilder->getQuery();
226 14
            $jobs = $query->getResult();
227 14
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
228 11
                foreach ($jobs as $job) {
229 11
                    if ($job = $this->takeJob($job['id'], $runId)) {
230 11
                        return $job;
231
                    }
232
                }
233
            }
234 9
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
235
236 9
        return null;
237
    }
238
239
    /**
240
     * @param string|null $workerName
241
     * @param string|null $methodName
242
     * @param bool        $prioritize
243
     *
244
     * @return QueryBuilder
245
     */
246 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
247
    {
248
        /** @var EntityRepository $repository */
249 14
        $repository = $this->getRepository();
250 14
        $queryBuilder = $repository->createQueryBuilder('j');
251 14
        $this->addStandardPredicate($queryBuilder);
252 14
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
253
254 14
        if ($prioritize) {
255 14
            $queryBuilder->addOrderBy('j.priority', 'DESC');
256 14
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
257 14
        } else {
258 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
259
        }
260
261 14
        return $queryBuilder;
262
    }
263
264 17
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
265
    {
266 17
        $dateTime = Util::getMicrotimeDateTime();
267 17
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
268
269
        $queryBuilder
270 17
            ->where('j.status = :status')->setParameter('status', $status)
271 17
            ->andWhere($queryBuilder->expr()->orX(
272 17
                $queryBuilder->expr()->isNull('j.whenUs'),
273 17
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
274 17
            ))
275 17
            ->andWhere($queryBuilder->expr()->orX(
276 17
                $queryBuilder->expr()->isNull('j.expiresAt'),
277 17
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
278 17
            ))
279 17
            ->setParameter('whenUs', $decimal)
280 17
            ->setParameter('expiresAt', $dateTime);
281 17
    }
282
283
    /**
284
     * @param int $runId
285
     */
286 11
    protected function takeJob($jobId, $runId = null)
287
    {
288
        /** @var EntityRepository $repository */
289 11
        $repository = $this->getRepository();
290
        /** @var QueryBuilder $queryBuilder */
291 11
        $queryBuilder = $repository->createQueryBuilder('j');
292
        $queryBuilder
293 11
            ->update()
294 11
            ->set('j.status', ':status')
295 11
            ->setParameter('status', BaseJob::STATUS_RUNNING);
296 11
        if (null !== $runId) {
297
            $queryBuilder
298 1
                ->set('j.runId', ':runId')
299 1
                ->setParameter('runId', $runId);
300 1
        }
301 11
        $queryBuilder->set('j.startedAt', ':startedAt')
302 11
            ->setParameter('startedAt', Util::getMicrotimeDateTime());
303 11
        $queryBuilder->where('j.id = :id');
304 11
        $queryBuilder->setParameter('id', $jobId);
305 11
        $queryBuilder->andWhere('j.status = :statusNew')->setParameter('statusNew', BaseJob::STATUS_NEW);
306 11
        $resultCount = $queryBuilder->getQuery()->execute();
307
308 11
        if (1 === $resultCount) {
309 11
            return $this->findRefresh($jobId);
310
        }
311
312
        return null;
313
    }
314
315 11
    protected function findRefresh($id)
316
    {
317
        /** @var EntityManager $entityManager */
318 11
        $entityManager = $this->getObjectManager();
319 11
        if ($job = $entityManager->getUnitOfWork()->tryGetById(['id' => $id], $this->getJobClass())) {
320 11
            $entityManager->refresh($job);
0 ignored issues
show
Bug introduced by
It seems like $job can also be of type true; however, parameter $entity of Doctrine\ORM\EntityManager::refresh() does only seem to accept object, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

320
            $entityManager->refresh(/** @scrutinizer ignore-type */ $job);
Loading history...
321
322 11
            return $job;
323
        }
324
325
        return $this->getRepository()->find($id);
326
    }
327
328
    /**
329
     * Tries to update the nearest job as a batch.
330
     *
331
     * @param \Dtc\QueueBundle\Model\Job $job
332
     *
333
     * @return null|Job
334
     */
335 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
336
    {
337 1
        if (!$job instanceof Job) {
338
            throw new UnsupportedException('$job must be instance of '.Job::class);
339
        }
340
341
        /** @var QueryBuilder $queryBuilder */
342 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
343 1
        $queryBuilder->select()
344 1
            ->where('j.crcHash = :crcHash')
345 1
            ->andWhere('j.status = :status')
346 1
            ->setParameter('status', BaseJob::STATUS_NEW)
347 1
            ->setParameter('crcHash', $job->getCrcHash())
348 1
            ->orderBy('j.whenUs', 'ASC')
349 1
            ->setMaxResults(1);
350 1
        $existingJobs = $queryBuilder->getQuery()->execute();
351
352 1
        if (empty($existingJobs)) {
353
            return null;
354
        }
355
356
        /** @var Job $existingJob */
357 1
        $existingJob = $existingJobs[0];
358
359 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
360 1
        $newWhenUs = $existingJob->getWhenUs();
361 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
362 1
        if ($bcResult < 0) {
363 1
            $newWhenUs = $job->getWhenUs();
364 1
        }
365
366 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
367
368 1
        return $existingJob;
369
    }
370
371
    /**
372
     * @param int    $newPriority
373
     * @param string $newWhenUs
374
     */
375 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
376
    {
377 1
        $existingPriority = $existingJob->getPriority();
378 1
        $existingWhenUs = $existingJob->getWhenUs();
379
380 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
381
            /** @var EntityRepository $repository */
382 1
            $repository = $this->getRepository();
383
            /** @var QueryBuilder $queryBuilder */
384 1
            $queryBuilder = $repository->createQueryBuilder('j');
385 1
            $queryBuilder->update();
386 1
            if ($newPriority !== $existingPriority) {
387 1
                $existingJob->setPriority($newPriority);
388 1
                $queryBuilder->set('j.priority', ':priority')
389 1
                    ->setParameter('priority', $newPriority);
390 1
            }
391 1
            if ($newWhenUs !== $existingWhenUs) {
392 1
                $existingJob->setWhenUs($newWhenUs);
393 1
                $queryBuilder->set('j.whenUs', ':whenUs')
394 1
                    ->setParameter('whenUs', $newWhenUs);
395 1
            }
396 1
            $queryBuilder->where('j.id = :id');
397 1
            $queryBuilder->setParameter('id', $existingJob->getId());
398 1
            $queryBuilder->getQuery()->execute();
399 1
        }
400
401 1
        return $existingJob;
402
    }
403
404 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
405
    {
406
        /** @var EntityRepository $repository */
407 2
        $repository = $this->getRepository();
408 2
        $queryBuilder = $repository->createQueryBuilder('j');
409 2
        $this->addStandardPredicate($queryBuilder, $status);
410
        $queryBuilder
411 2
            ->select('DISTINCT j.workerName, j.method');
412
413 2
        $results = $queryBuilder->getQuery()->getArrayResult();
414 2
        if (empty($results)) {
415 2
            return [];
416
        }
417
        $workerMethods = [];
418
        foreach ($results as $result) {
419
            $workerMethods[$result['workerName']][] = $result['method'];
420
        }
421
422
        return $workerMethods;
423
    }
424
425
    /**
426
     * @param string $workerName
427
     * @param string $methodName
428
     */
429 2
    public function countLiveJobs($workerName = null, $methodName = null)
430
    {
431
        /** @var EntityRepository $repository */
432 2
        $repository = $this->getRepository();
433 2
        $queryBuilder = $repository->createQueryBuilder('j');
434 2
        $this->addStandardPredicate($queryBuilder);
435 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
436 2
        $queryBuilder->select('count(j.id)');
437
438 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
439
    }
440
441
    /**
442
     * @param string        $workerName
443
     * @param string        $methodName
444
     * @param callable|null $progressCallback
445
     */
446 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
447
    {
448
        // First mark all Live non-running jobs as Archive
449 1
        $repository = $this->getRepository();
450
        /** @var QueryBuilder $queryBuilder */
451 1
        $queryBuilder = $repository->createQueryBuilder('j');
452 1
        $queryBuilder->update($this->getJobClass(), 'j')
453 1
            ->set('j.status', ':statusArchive')
454 1
            ->setParameter('statusArchive', Job::STATUS_ARCHIVE);
455 1
        $this->addStandardPredicate($queryBuilder);
456 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
457 1
        $resultCount = $queryBuilder->getQuery()->execute();
458
459 1
        if ($resultCount) {
460 1
            $this->runArchive($workerName, $methodName, $progressCallback);
461 1
        }
462 1
    }
463
464
    /**
465
     * Move jobs in 'archive' status to the archive table.
466
     *
467
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
468
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
469
     *
470
     * @param string|null   $workerName
471
     * @param string|null   $methodName
472
     * @param callable|null $progressCallback
473
     */
474 1
    protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $methodName is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

474
    protected function runArchive($workerName = null, /** @scrutinizer ignore-unused */ $methodName = null, callable $progressCallback = null)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $workerName is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

474
    protected function runArchive(/** @scrutinizer ignore-unused */ $workerName = null, $methodName = null, callable $progressCallback = null)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
475
    {
476
        /** @var EntityManager $entityManager */
477 1
        $entityManager = $this->getObjectManager();
478 1
        $count = 0;
479
        do {
480
            /** @var EntityRepository $repository */
481 1
            $repository = $this->getRepository();
482 1
            $queryBuilder = $repository->createQueryBuilder('j');
483 1
            $queryBuilder->where('j.status = :status')
484 1
                ->setParameter('status', Job::STATUS_ARCHIVE)
485 1
                ->setMaxResults(10000);
486
487 1
            $results = $queryBuilder->getQuery()->getArrayResult();
488 1
            foreach ($results as $jobRow) {
489 1
                $job = $repository->find($jobRow['id']);
490 1
                if ($job) {
491 1
                    $entityManager->remove($job);
492 1
                }
493 1
                ++$count;
494 1
                if (0 == $count % 10) {
495
                    $this->flush();
496
                    $this->updateProgress($progressCallback, $count);
497
                }
498 1
            }
499 1
            $this->flush();
500 1
            $this->updateProgress($progressCallback, $count);
501 1
        } while (!empty($results) && 10000 == count($results));
502 1
    }
503
}
504