Completed
Branch feature/new-scrutinizer (53aa9f)
by Matthew
03:01
created

JobManager::getWorkersAndMethods()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3.512

Importance

Changes 0
Metric Value
dl 0
loc 20
ccs 8
cts 13
cp 0.6153
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 13
nc 3
nop 1
crap 3.512
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 31
    public function getObjectManager()
22
    {
23 31
        return $this->getObjectManagerReset();
24
    }
25
26 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
27
    {
28
        /** @var EntityManager $objectManager */
29 3
        $objectManager = $this->getObjectManager();
30
31
        $queryBuilder = $objectManager
32 3
            ->createQueryBuilder()
33 3
            ->select('count(a.id)')
34 3
            ->from($objectName, 'a')
35 3
            ->where('a.status = :status');
36
37 3
        if (null !== $workerName) {
38 1
            $queryBuilder->andWhere('a.workerName = :workerName')
39 1
                ->setParameter('workerName', $workerName);
40 1
        }
41
42 3
        if (null !== $method) {
43 1
            $queryBuilder->andWhere('a.method = :method')
44 1
                ->setParameter('method', $workerName);
45 1
        }
46
47 3
        $count = $queryBuilder->setParameter('status', $status)
48 3
            ->getQuery()->getSingleScalarResult();
49
50 3
        if (!$count) {
51 1
            return 0;
52
        }
53
54 3
        return $count;
55
    }
56
57
    /**
58
     * @param string|null $workerName
59
     * @param string|null $method
60
     *
61
     * @return int Count of jobs pruned
62
     */
63 2
    public function pruneExceptionJobs($workerName = null, $method = null)
64
    {
65
        /** @var EntityManager $objectManager */
66 1
        $objectManager = $this->getObjectManager();
67 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
68 1
        $queryBuilder->where('j.status = :status')
69 1
            ->setParameter('status', BaseJob::STATUS_EXCEPTION);
70
71 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
72 2
        $query = $queryBuilder->getQuery();
73
74 1
        return intval($query->execute());
75
    }
76
77 21
    protected function resetSaveOk($function)
78
    {
79 21
        $objectManager = $this->getObjectManager();
80 21
        $splObjectHash = spl_object_hash($objectManager);
81
82 21
        if ('save' === $function) {
83
            $compare = static::$resetInsertCalled;
84
        } else {
85 21
            $compare = static::$saveInsertCalled;
86
        }
87
88 21
        if ($splObjectHash === $compare) {
89
            // Insert SQL is cached...
90
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
91
            throw new LogicException($msg);
92
        }
93
94 21
        if ('save' === $function) {
95
            static::$saveInsertCalled = spl_object_hash($objectManager);
96
        } else {
97 21
            static::$resetInsertCalled = spl_object_hash($objectManager);
98
        }
99 21
    }
100
101
    /**
102
     * @param string $workerName
103
     * @param string $method
104
     */
105 17
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
106
    {
107 17
        if (null !== $workerName) {
108 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter('workerName', $workerName);
109 5
        }
110
111 17
        if (null !== $method) {
112 4
            $queryBuilder->andWhere('j.method = :method')->setParameter('method', $method);
113 4
        }
114 17
    }
115
116 1
    protected function updateExpired($workerName = null, $method = null)
117
    {
118
        /** @var EntityManager $objectManager */
119 1
        $objectManager = $this->getObjectManager();
120 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
121 1
        $queryBuilder->set('j.status', ':newStatus');
122 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
123 1
            ->setParameter('expiresAt', Util::getMicrotimeDateTime());
124 1
        $queryBuilder->andWhere('j.status = :status')
125 1
            ->setParameter('status', BaseJob::STATUS_NEW)
126 1
            ->setParameter('newStatus', Job::STATUS_EXPIRED);
127
128 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
129 1
        $query = $queryBuilder->getQuery();
130
131 1
        return intval($query->execute());
132
    }
133
134
    /**
135
     * Removes archived jobs older than $olderThan.
136
     *
137
     * @param \DateTime $olderThan
138
     */
139 1
    public function pruneArchivedJobs(\DateTime $olderThan)
140
    {
141 1
        return $this->removeOlderThan(
142 1
            $this->getJobArchiveClass(),
143 1
                'updatedAt',
144
                $olderThan
145 1
        );
146
    }
147
148 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
149
    {
150
        /** @var EntityManager $objectManager */
151 2
        $objectManager = $this->getObjectManager();
152 2
        $queryBuilder = $objectManager->createQueryBuilder();
153
154 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
155
156 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
157 2
        $this->addStandardPredicate($queryBuilder);
158
159 2
        $query = $queryBuilder->getQuery();
160
161 2
        return $query->getSingleScalarResult();
162
    }
163
164
    /**
165
     * Get Jobs statuses.
166
     */
167 3
    public function getStatus()
168
    {
169 3
        $result = [];
170 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
171 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
172
173 3
        $finalResult = [];
174 3
        foreach ($result as $key => $item) {
175 1
            ksort($item);
176 1
            foreach ($item as $status => $count) {
177 1
                if (isset($finalResult[$key][$status])) {
178
                    $finalResult[$key][$status] += $count;
179
                } else {
180 1
                    $finalResult[$key][$status] = $count;
181
                }
182 1
            }
183 3
        }
184
185 3
        return $finalResult;
186
    }
187
188
    /**
189
     * @param string $entityName
190
     */
191 3
    protected function getStatusByEntityName($entityName, array &$result)
192
    {
193
        /** @var EntityManager $objectManager */
194 3
        $objectManager = $this->getObjectManager();
195 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
196 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
197
198 3
        foreach ($result1 as $item) {
199 1
            $method = $item['workerName'].'->'.$item['method'].'()';
200 1
            if (!isset($result[$method])) {
201 1
                $result[$method] = static::getAllStatuses();
202 1
            }
203 1
            $result[$method][$item['status']] += intval($item['c']);
204 3
        }
205 3
    }
206
207
    /**
208
     * Get the next job to run (can be filtered by workername and method name).
209
     *
210
     * @param string $workerName
211
     * @param string $methodName
212
     * @param bool   $prioritize
213
     * @param int    $runId
214
     *
215
     * @return Job|null
216
     */
217 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
218
    {
219
        do {
220 14
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
221 14
            $queryBuilder->select('j.id');
222 14
            $queryBuilder->setMaxResults(100);
223
224
            /** @var QueryBuilder $queryBuilder */
225 14
            $query = $queryBuilder->getQuery();
226 14
            $jobs = $query->getResult();
227 14
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
228 11
                foreach ($jobs as $job) {
229 11
                    if ($job = $this->takeJob($job['id'], $runId)) {
230 11
                        return $job;
231
                    }
232
                }
233
            }
234 9
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
235
236 9
        return null;
237
    }
238
239
    /**
240
     * @param string|null $workerName
241
     * @param string|null $methodName
242
     * @param bool        $prioritize
243
     *
244
     * @return QueryBuilder
245
     */
246 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
247
    {
248
        /** @var EntityRepository $repository */
249 14
        $repository = $this->getRepository();
250 14
        $queryBuilder = $repository->createQueryBuilder('j');
251 14
        $this->addStandardPredicate($queryBuilder);
252 14
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
253
254 14
        if ($prioritize) {
255 14
            $queryBuilder->addOrderBy('j.priority', 'DESC');
256 14
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
257 14
        } else {
258 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
259
        }
260
261 14
        return $queryBuilder;
262
    }
263
264 17
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
265
    {
266 17
        $dateTime = Util::getMicrotimeDateTime();
267 17
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
268
269
        $queryBuilder
270 17
            ->where('j.status = :status')->setParameter('status', $status)
271 17
            ->andWhere($queryBuilder->expr()->orX(
272 17
                $queryBuilder->expr()->isNull('j.whenUs'),
273 17
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
274 17
            ))
275 17
            ->andWhere($queryBuilder->expr()->orX(
276 17
                $queryBuilder->expr()->isNull('j.expiresAt'),
277 17
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
278 17
            ))
279 17
            ->setParameter('whenUs', $decimal)
280 17
            ->setParameter('expiresAt', $dateTime);
281 17
    }
282
283
    /**
284
     * @param int $runId
285
     */
286 11
    protected function takeJob($jobId, $runId = null)
287
    {
288
        /** @var EntityRepository $repository */
289 11
        $repository = $this->getRepository();
290
        /** @var QueryBuilder $queryBuilder */
291 11
        $queryBuilder = $repository->createQueryBuilder('j');
292
        $queryBuilder
293 11
            ->update()
294 11
            ->set('j.status', ':status')
295 11
            ->setParameter('status', BaseJob::STATUS_RUNNING);
296 11
        if (null !== $runId) {
297
            $queryBuilder
298 1
                ->set('j.runId', ':runId')
299 1
                ->setParameter('runId', $runId);
300 1
        }
301 11
        $queryBuilder->set('j.startedAt', ':startedAt')
302 11
            ->setParameter('startedAt', Util::getMicrotimeDateTime());
303 11
        $queryBuilder->where('j.id = :id');
304 11
        $queryBuilder->setParameter('id', $jobId);
305 11
        $queryBuilder->andWhere('j.status = :statusNew')->setParameter('statusNew', BaseJob::STATUS_NEW);
306 11
        $resultCount = $queryBuilder->getQuery()->execute();
307
308 11
        if (1 === $resultCount) {
309 11
            return $this->findRefresh($jobId);
310
        }
311
312
        return null;
313
    }
314
315 11
    protected function findRefresh($id)
316
    {
317
        /** @var EntityManager $entityManager */
318 11
        $entityManager = $this->getObjectManager();
319 11
        if ($job = $entityManager->getUnitOfWork()->tryGetById(['id' => $id], $this->getJobClass())) {
320 11
            $entityManager->refresh($job);
0 ignored issues
show
Bug introduced by
It seems like $job defined by $entityManager->getUnitO..., $this->getJobClass()) on line 319 can also be of type boolean; however, Doctrine\ORM\EntityManager::refresh() does only seem to accept object, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
321
322 11
            return $job;
323
        }
324
325
        return $this->getRepository()->find($id);
326
    }
327
328
    /**
329
     * Tries to update the nearest job as a batch.
330
     *
331
     * @param \Dtc\QueueBundle\Model\Job $job
332
     *
333
     * @return null|Job
334
     */
335 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
336
    {
337 1
        if (!$job instanceof Job) {
338
            throw new UnsupportedException('$job must be instance of '.Job::class);
339
        }
340
341
        /** @var QueryBuilder $queryBuilder */
342 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
343 1
        $queryBuilder->select()
344 1
            ->where('j.crcHash = :crcHash')
345 1
            ->andWhere('j.status = :status')
346 1
            ->setParameter('status', BaseJob::STATUS_NEW)
347 1
            ->setParameter('crcHash', $job->getCrcHash())
348 1
            ->orderBy('j.whenUs', 'ASC')
349 1
            ->setMaxResults(1);
350 1
        $existingJobs = $queryBuilder->getQuery()->execute();
351
352 1
        if (empty($existingJobs)) {
353
            return null;
354
        }
355
356
        /** @var Job $existingJob */
357 1
        $existingJob = $existingJobs[0];
358
359 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
360 1
        $newWhenUs = $existingJob->getWhenUs();
361 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
362 1
        if ($bcResult < 0) {
363 1
            $newWhenUs = $job->getWhenUs();
364 1
        }
365
366 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
367
368 1
        return $existingJob;
369
    }
370
371
    /**
372
     * @param int    $newPriority
373
     * @param string $newWhenUs
374
     */
375 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
376
    {
377 1
        $existingPriority = $existingJob->getPriority();
378 1
        $existingWhenUs = $existingJob->getWhenUs();
379
380 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
381
            /** @var EntityRepository $repository */
382 1
            $repository = $this->getRepository();
383
            /** @var QueryBuilder $queryBuilder */
384 1
            $queryBuilder = $repository->createQueryBuilder('j');
385 1
            $queryBuilder->update();
386 1
            if ($newPriority !== $existingPriority) {
387 1
                $existingJob->setPriority($newPriority);
388 1
                $queryBuilder->set('j.priority', ':priority')
389 1
                    ->setParameter('priority', $newPriority);
390 1
            }
391 1
            if ($newWhenUs !== $existingWhenUs) {
392 1
                $existingJob->setWhenUs($newWhenUs);
393 1
                $queryBuilder->set('j.whenUs', ':whenUs')
394 1
                    ->setParameter('whenUs', $newWhenUs);
395 1
            }
396 1
            $queryBuilder->where('j.id = :id');
397 1
            $queryBuilder->setParameter('id', $existingJob->getId());
398 1
            $queryBuilder->getQuery()->execute();
399 1
        }
400
401 1
        return $existingJob;
402
    }
403
404 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
405
    {
406
        /** @var EntityRepository $repository */
407 2
        $repository = $this->getRepository();
408 2
        $queryBuilder = $repository->createQueryBuilder('j');
409 2
        $this->addStandardPredicate($queryBuilder, $status);
410
        $queryBuilder
411 2
            ->select('DISTINCT j.workerName, j.method');
412
413 2
        $results = $queryBuilder->getQuery()->getArrayResult();
414 2
        if (empty($results)) {
415 2
            return [];
416
        }
417
        $workerMethods = [];
418
        foreach ($results as $result) {
419
            $workerMethods[$result['workerName']][] = $result['method'];
420
        }
421
422
        return $workerMethods;
423
    }
424
425
    /**
426
     * @param string $workerName
427
     * @param string $methodName
428
     */
429 2
    public function countLiveJobs($workerName = null, $methodName = null)
430
    {
431
        /** @var EntityRepository $repository */
432 2
        $repository = $this->getRepository();
433 2
        $queryBuilder = $repository->createQueryBuilder('j');
434 2
        $this->addStandardPredicate($queryBuilder);
435 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
436 2
        $queryBuilder->select('count(j.id)');
437
438 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
439
    }
440
441
    /**
442
     * @param string        $workerName
443
     * @param string        $methodName
444
     * @param callable|null $progressCallback
445
     */
446 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
447
    {
448
        // First mark all Live non-running jobs as Archive
449 1
        $repository = $this->getRepository();
450
        /** @var QueryBuilder $queryBuilder */
451 1
        $queryBuilder = $repository->createQueryBuilder('j');
452 1
        $queryBuilder->update($this->getJobClass(), 'j')
453 1
            ->set('j.status', ':statusArchive')
454 1
            ->setParameter('statusArchive', Job::STATUS_ARCHIVE);
455 1
        $this->addStandardPredicate($queryBuilder);
456 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
457 1
        $resultCount = $queryBuilder->getQuery()->execute();
458
459 1
        if ($resultCount) {
460 1
            $this->runArchive($workerName, $methodName, $progressCallback);
461 1
        }
462 1
    }
463
464
    /**
465
     * Move jobs in 'archive' status to the archive table.
466
     *
467
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
468
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
469
     *
470
     * @param string|null   $workerName
471
     * @param string|null   $methodName
472
     * @param callable|null $progressCallback
473
     */
474 1
    protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
475
    {
476
        /** @var EntityManager $entityManager */
477 1
        $entityManager = $this->getObjectManager();
478 1
        $count = 0;
479
        do {
480
            /** @var EntityRepository $repository */
481 1
            $repository = $this->getRepository();
482 1
            $queryBuilder = $repository->createQueryBuilder('j');
483 1
            $queryBuilder->where('j.status = :status')
484 1
                ->setParameter('status', Job::STATUS_ARCHIVE)
485 1
                ->setMaxResults(10000);
486
487 1
            $results = $queryBuilder->getQuery()->getArrayResult();
488 1
            foreach ($results as $jobRow) {
489 1
                $job = $repository->find($jobRow['id']);
490 1
                if ($job) {
491 1
                    $entityManager->remove($job);
492 1
                }
493 1
                ++$count;
494 1
                if (0 == $count % 10) {
495
                    $this->flush();
496
                    $this->updateProgress($progressCallback, $count);
497
                }
498 1
            }
499 1
            $this->flush();
500 1
            $this->updateProgress($progressCallback, $count);
501 1
        } while (!empty($results) && 10000 == count($results));
502 1
    }
503
}
504