Completed
Pull Request — master (#30)
by Matthew
48:21 queued 02:09
created

JobManager::takeJob()   B

Complexity

Conditions 3
Paths 4

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 3.0017

Importance

Changes 0
Metric Value
dl 0
loc 27
ccs 16
cts 17
cp 0.9412
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 19
nc 4
nop 2
crap 3.0017
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Model\StallableJob;
14
use Dtc\QueueBundle\Util\Util;
15
use Symfony\Component\Process\Exception\LogicException;
16
17
class JobManager extends DoctrineJobManager
18
{
19
    use CommonTrait;
20
    protected static $saveInsertCalled = null;
21
    protected static $resetInsertCalled = null;
22
23 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
24
    {
25
        /** @var EntityManager $objectManager */
26 3
        $objectManager = $this->getObjectManager();
27
28
        $queryBuilder = $objectManager
29 3
            ->createQueryBuilder()
30 3
            ->select('count(a.id)')
31 3
            ->from($objectName, 'a')
32 3
            ->where('a.status = :status');
33
34 3
        if (null !== $workerName) {
35 1
            $queryBuilder->andWhere('a.workerName = :workerName')
36 1
                ->setParameter(':workerName', $workerName);
37
        }
38
39 3
        if (null !== $method) {
40 1
            $queryBuilder->andWhere('a.method = :method')
41 1
                ->setParameter(':method', $workerName);
42
        }
43
44 3
        $count = $queryBuilder->setParameter(':status', $status)
45 3
            ->getQuery()->getSingleScalarResult();
46
47 3
        if (!$count) {
48 1
            return 0;
49
        }
50
51 3
        return $count;
52
    }
53
54
    /**
55
     * @param string|null $workerName
56
     * @param string|null $method
57
     *
58
     * @return int Count of jobs pruned
59
     */
60 1
    public function pruneExceptionJobs($workerName = null, $method = null)
61
    {
62
        /** @var EntityManager $objectManager */
63 1
        $objectManager = $this->getObjectManager();
64 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
65 1
        $queryBuilder->where('j.status = :status')
66 1
            ->setParameter(':status', BaseJob::STATUS_EXCEPTION);
67
68 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
69 1
        $query = $queryBuilder->getQuery();
70
71 1
        return intval($query->execute());
72
    }
73
74 21
    protected function resetSaveOk($function)
75
    {
76 21
        $objectManager = $this->getObjectManager();
77 21
        $splObjectHash = spl_object_hash($objectManager);
78
79 21
        if ('save' === $function) {
80
            $compare = static::$resetInsertCalled;
81
        } else {
82 21
            $compare = static::$saveInsertCalled;
83
        }
84
85 21
        if ($splObjectHash === $compare) {
86
            // Insert SQL is cached...
87
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
88
            throw new LogicException($msg);
89
        }
90
91 21
        if ('save' === $function) {
92
            static::$saveInsertCalled = spl_object_hash($objectManager);
93
        } else {
94 21
            static::$resetInsertCalled = spl_object_hash($objectManager);
95
        }
96 21
    }
97
98
    /**
99
     * @param string $workerName
100
     * @param string $method
101
     */
102 13
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
103
    {
104 13
        if (null !== $workerName) {
105 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
106
        }
107
108 13
        if (null !== $method) {
109 4
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
110
        }
111 13
    }
112
113 1
    protected function updateExpired($workerName = null, $method = null)
114
    {
115
        /** @var EntityManager $objectManager */
116 1
        $objectManager = $this->getObjectManager();
117 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
118 1
        $queryBuilder->set('j.status', ':newStatus');
119 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
120 1
            ->setParameter(':expiresAt', Util::getMicrotimeDateTime());
121 1
        $queryBuilder->andWhere('j.status = :status')
122 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
123 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
124
125 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
126 1
        $query = $queryBuilder->getQuery();
127
128 1
        return intval($query->execute());
129
    }
130
131
    /**
132
     * Removes archived jobs older than $olderThan.
133
     *
134
     * @param \DateTime $olderThan
135
     */
136 1
    public function pruneArchivedJobs(\DateTime $olderThan)
137
    {
138 1
        return $this->removeOlderThan(
139 1
            $this->getJobArchiveClass(),
140 1
                'updatedAt',
141 1
                $olderThan
142
        );
143
    }
144
145 2
    public function getJobCount($workerName = null, $method = null)
146
    {
147
        /** @var EntityManager $objectManager */
148 2
        $objectManager = $this->getObjectManager();
149 2
        $queryBuilder = $objectManager->createQueryBuilder();
150
151 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
152
153 2
        $where = 'where';
154 2
        if (null !== $workerName) {
155
            if (null !== $method) {
156
                $queryBuilder->where($queryBuilder->expr()->andX(
157
                    $queryBuilder->expr()->eq('j.workerName', ':workerName'),
158
                                                $queryBuilder->expr()->eq('j.method', ':method')
159
                ))
160
                    ->setParameter(':method', $method);
161
            } else {
162
                $queryBuilder->where('j.workerName = :workerName');
163
            }
164
            $queryBuilder->setParameter(':workerName', $workerName);
165
            $where = 'andWhere';
166 2
        } elseif (null !== $method) {
167
            $queryBuilder->where('j.method = :method')->setParameter(':method', $method);
168
            $where = 'andWhere';
169
        }
170
171
        // Filter
172
        $queryBuilder
173 2
            ->$where($queryBuilder->expr()->orX(
174 2
                $queryBuilder->expr()->isNull('j.whenUs'),
175 2
                                        $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
176
            ))
177 2
            ->andWhere($queryBuilder->expr()->orX(
178 2
                $queryBuilder->expr()->isNull('j.expiresAt'),
179 2
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
180
            ))
181 2
            ->setParameter(':whenUs', Util::getMicrotimeDecimal())
182 2
            ->setParameter(':expiresAt', Util::getMicrotimeDateTime());
183
184 2
        $query = $queryBuilder->getQuery();
185
186 2
        return $query->getSingleScalarResult();
187
    }
188
189
    /**
190
     * Get Jobs statuses.
191
     */
192 3
    public function getStatus()
193
    {
194 3
        $result = [];
195 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
196 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
197
198 3
        $finalResult = [];
199 3
        foreach ($result as $key => $item) {
200 1
            ksort($item);
201 1
            foreach ($item as $status => $count) {
202 1
                if (isset($finalResult[$key][$status])) {
203
                    $finalResult[$key][$status] += $count;
204
                } else {
205 1
                    $finalResult[$key][$status] = $count;
206
                }
207
            }
208
        }
209
210 3
        return $finalResult;
211
    }
212
213
    /**
214
     * @param string $entityName
215
     */
216 3
    protected function getStatusByEntityName($entityName, array &$result)
217
    {
218
        /** @var EntityManager $objectManager */
219 3
        $objectManager = $this->getObjectManager();
220 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
221 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
222
223 3
        foreach ($result1 as $item) {
224 1
            $method = $item['workerName'].'->'.$item['method'].'()';
225 1
            if (!isset($result[$method])) {
226 1
                $result[$method] = [BaseJob::STATUS_NEW => 0,
227
                    BaseJob::STATUS_RUNNING => 0,
228
                    BaseJob::STATUS_SUCCESS => 0,
229
                    BaseJob::STATUS_FAILURE => 0,
230
                    BaseJob::STATUS_EXCEPTION => 0,
231
                    StallableJob::STATUS_STALLED => 0,
232
                    \Dtc\QueueBundle\Model\Job::STATUS_EXPIRED => 0,
233
                    RetryableJob::STATUS_MAX_FAILURES => 0,
234
                    RetryableJob::STATUS_MAX_EXCEPTIONS => 0,
235
                    StallableJob::STATUS_MAX_STALLS => 0,
236
                    RetryableJob::STATUS_MAX_RETRIES => 0, ];
237
            }
238 1
            $result[$method][$item['status']] += intval($item['c']);
239
        }
240 3
    }
241
242
    /**
243
     * Get the next job to run (can be filtered by workername and method name).
244
     *
245
     * @param string $workerName
246
     * @param string $methodName
247
     * @param bool   $prioritize
248
     * @param int    $runId
249
     *
250
     * @return Job|null
251
     */
252 11
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
253
    {
254
        do {
255 11
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
256 11
            $queryBuilder->select('j.id');
257 11
            $queryBuilder->setMaxResults(100);
258
259
            /** @var QueryBuilder $queryBuilder */
260 11
            $query = $queryBuilder->getQuery();
261 11
            $jobs = $query->getResult();
262 11
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
263 9
                foreach ($jobs as $job) {
264 9
                    if ($job = $this->takeJob($job['id'], $runId)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $job is correct as $this->takeJob($job['id'], $runId) (which targets Dtc\QueueBundle\ORM\JobManager::takeJob()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
265 9
                        return $job;
266
                    }
267
                }
268
            }
269 6
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
270
271 6
        return null;
272
    }
273
274
    /**
275
     * @param string|null $workerName
276
     * @param string|null $methodName
277
     * @param bool        $prioritize
278
     *
279
     * @return QueryBuilder
280
     */
281 11
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
282
    {
283
        /** @var EntityRepository $repository */
284 11
        $repository = $this->getRepository();
285 11
        $queryBuilder = $repository->createQueryBuilder('j');
286 11
        $this->addStandardPredicate($queryBuilder);
287 11
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
288
289 11
        if ($prioritize) {
290 11
            $queryBuilder->addOrderBy('j.priority', 'DESC');
291 11
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
292
        } else {
293 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
294
        }
295
296 11
        return $queryBuilder;
297
    }
298
299 12
    protected function addStandardPredicate(QueryBuilder $queryBuilder)
300
    {
301 12
        $dateTime = Util::getMicrotimeDateTime();
302 12
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
303
304
        $queryBuilder
305 12
            ->where('j.status = :status')->setParameter(':status', BaseJob::STATUS_NEW)
306 12
            ->andWhere($queryBuilder->expr()->orX(
307 12
                $queryBuilder->expr()->isNull('j.whenUs'),
308 12
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
309
            ))
310 12
            ->andWhere($queryBuilder->expr()->orX(
311 12
                $queryBuilder->expr()->isNull('j.expiresAt'),
312 12
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
313
            ))
314 12
            ->setParameter(':whenUs', $decimal)
315 12
            ->setParameter(':expiresAt', $dateTime);
316 12
    }
317
318
    /**
319
     * @param int $runId
320
     */
321 9
    protected function takeJob($jobId, $runId = null)
322
    {
323
        /** @var EntityRepository $repository */
324 9
        $repository = $this->getRepository();
325
        /** @var QueryBuilder $queryBuilder */
326 9
        $queryBuilder = $repository->createQueryBuilder('j');
327
        $queryBuilder
328 9
            ->update()
329 9
            ->set('j.status', ':status')
330 9
            ->setParameter(':status', BaseJob::STATUS_RUNNING);
331 9
        if (null !== $runId) {
332
            $queryBuilder
333 1
                ->set('j.runId', ':runId')
334 1
                ->setParameter(':runId', $runId);
335
        }
336 9
        $queryBuilder->set('j.startedAt', ':startedAt')
337 9
            ->setParameter(':startedAt', Util::getMicrotimeDateTime());
338 9
        $queryBuilder->where('j.id = :id');
339 9
        $queryBuilder->setParameter(':id', $jobId);
340 9
        $resultCount = $queryBuilder->getQuery()->execute();
341
342 9
        if (1 === $resultCount) {
343 9
            return $repository->find($jobId);
344
        }
345
346
        return null;
347
    }
348
349
    /**
350
     * Tries to update the nearest job as a batch.
351
     *
352
     * @param \Dtc\QueueBundle\Model\Job $job
353
     *
354
     * @return null|Job
355
     */
356 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
357
    {
358 1
        if (!$job instanceof Job) {
359
            throw new UnsupportedException('$job must be instance of '.Job::class);
360
        }
361
362
        /** @var QueryBuilder $queryBuilder */
363 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
364 1
        $queryBuilder->select()
365 1
            ->where('j.crcHash = :crcHash')
366 1
            ->andWhere('j.status = :status')
367 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
368 1
            ->setParameter(':crcHash', $job->getCrcHash())
369 1
            ->orderBy('j.whenUs', 'ASC')
370 1
            ->setMaxResults(1);
371 1
        $existingJobs = $queryBuilder->getQuery()->execute();
372
373 1
        if (empty($existingJobs)) {
374
            return null;
375
        }
376
377
        /** @var Job $existingJob */
378 1
        $existingJob = $existingJobs[0];
379
380 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
381 1
        $newWhenUs = $existingJob->getWhenUs();
382 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
383 1
        if ($bcResult < 0) {
384 1
            $newWhenUs = $job->getWhenUs();
385
        }
386
387 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
388
389 1
        return $existingJob;
390
    }
391
392
    /**
393
     * @param int    $newPriority
394
     * @param string $newWhenUs
395
     */
396 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
397
    {
398 1
        $existingPriority = $existingJob->getPriority();
399 1
        $existingWhenUs = $existingJob->getWhenUs();
400
401 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
402
            /** @var EntityRepository $repository */
403 1
            $repository = $this->getRepository();
404
            /** @var QueryBuilder $queryBuilder */
405 1
            $queryBuilder = $repository->createQueryBuilder('j');
406 1
            $queryBuilder->update();
407 1
            if ($newPriority !== $existingPriority) {
408 1
                $existingJob->setPriority($newPriority);
409 1
                $queryBuilder->set('j.priority', ':priority')
410 1
                    ->setParameter(':priority', $newPriority);
411
            }
412 1
            if ($newWhenUs !== $existingWhenUs) {
413 1
                $existingJob->setWhenUs($newWhenUs);
414 1
                $queryBuilder->set('j.whenUs', ':whenUs')
415 1
                    ->setParameter(':whenUs', $newWhenUs);
416
            }
417 1
            $queryBuilder->where('j.id = :id');
418 1
            $queryBuilder->setParameter(':id', $existingJob->getId());
419 1
            $queryBuilder->getQuery()->execute();
420
        }
421
422 1
        return $existingJob;
423
    }
424
425 1
    public function getWorkersAndMethods()
426
    {
427
        /** @var EntityRepository $repository */
428 1
        $repository = $this->getRepository();
429 1
        $queryBuilder = $repository->createQueryBuilder('j');
430 1
        $this->addStandardPredicate($queryBuilder);
431
        $queryBuilder
432 1
            ->select('DISTINCT j.workerName, j.method');
433
434 1
        $results = $queryBuilder->getQuery()->getArrayResult();
435 1
        if (empty($results)) {
436 1
            return [];
437
        }
438
        $workerMethods = [];
439
        foreach ($results as $result) {
440
            $workerMethods[$result['workerName']][] = $result['method'];
441
        }
442
443
        return $workerMethods;
444
    }
445
446
    /**
447
     * @param string $workerName
448
     * @param string $methodName
449
     */
450 2
    public function countLiveJobs($workerName = null, $methodName = null)
451
    {
452
        /** @var EntityRepository $repository */
453 2
        $repository = $this->getRepository();
454 2
        $queryBuilder = $repository->createQueryBuilder('j');
455 2
        $this->addStandardPredicate($queryBuilder);
456 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
457 2
        $queryBuilder->select('count(j.id)');
458
459 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
460
    }
461
462
    /**
463
     * @param string   $workerName
464
     * @param string   $methodName
465
     * @param \Closure $progressCallback
466
     */
467 1
    public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback)
468
    {
469
        // First mark all Live non-running jobs as Archive
470 1
        $repository = $this->getRepository();
471
        /** @var QueryBuilder $queryBuilder */
472 1
        $queryBuilder = $repository->createQueryBuilder('j');
473 1
        $queryBuilder->update($this->getJobClass(), 'j')
474 1
            ->set('j.status', ':statusArchive')
475 1
            ->setParameter(':statusArchive', Job::STATUS_ARCHIVE);
476 1
        $this->addStandardPredicate($queryBuilder);
477 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
478 1
        $resultCount = $queryBuilder->getQuery()->execute();
479
480 1
        if ($resultCount) {
481 1
            $this->runArchive($workerName, $methodName, $progressCallback);
482
        }
483 1
    }
484
485
    /**
486
     * Move jobs in 'archive' status to the archive table.
487
     *
488
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
489
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
490
     *
491
     * @param string|null $workerName
492
     * @param string|null $methodName
493
     * @param \Closure    $progressCallback
494
     */
495 1
    protected function runArchive($workerName = null, $methodName = null, $progressCallback)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
496
    {
497
        /** @var EntityManager $entityManager */
498 1
        $entityManager = $this->getObjectManager();
499 1
        $count = 0;
500
        do {
501
            /** @var EntityRepository $repository */
502 1
            $repository = $this->getRepository();
503 1
            $queryBuilder = $repository->createQueryBuilder('j');
504 1
            $queryBuilder->where('j.status = :status')
505 1
                ->setParameter(':status', Job::STATUS_ARCHIVE)
506 1
                ->setMaxResults(10000);
507
508 1
            $results = $queryBuilder->getQuery()->getArrayResult();
509 1
            foreach ($results as $jobRow) {
510 1
                $job = $repository->find($jobRow['id']);
511 1
                if ($job) {
512 1
                    $entityManager->remove($job);
513
                }
514 1
                ++$count;
515 1
                if (0 == $count % 10) {
516
                    $this->flush();
517 1
                    $progressCallback($count);
518
                }
519
            }
520 1
            $this->flush();
521 1
            $progressCallback($count);
522 1
        } while (!empty($results) && 10000 == count($results));
523 1
    }
524
}
525