Completed
Push — master ( 70c7f4...7fae80 )
by Matthew
14:52 queued 08:27
created

JobManager::getStatusByEntityName()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 12
cts 12
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 9
nc 3
nop 2
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 31
    public function getObjectManager()
22
    {
23 31
        return $this->getObjectManagerReset();
24
    }
25
26 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
27
    {
28
        /** @var EntityManager $objectManager */
29 3
        $objectManager = $this->getObjectManager();
30
31
        $queryBuilder = $objectManager
32 3
            ->createQueryBuilder()
33 3
            ->select('count(a.id)')
34 3
            ->from($objectName, 'a')
35 3
            ->where('a.status = :status');
36
37 3
        if (null !== $workerName) {
38 1
            $queryBuilder->andWhere('a.workerName = :workerName')
39 1
                ->setParameter('workerName', $workerName);
40 1
        }
41
42 3
        if (null !== $method) {
43 1
            $queryBuilder->andWhere('a.method = :method')
44 1
                ->setParameter('method', $workerName);
45 1
        }
46
47 3
        $count = $queryBuilder->setParameter('status', $status)
48 3
            ->getQuery()->getSingleScalarResult();
49
50 3
        if (!$count) {
51 1
            return 0;
52
        }
53
54 3
        return $count;
55
    }
56
57
    /**
58
     * @param string|null $workerName
59
     * @param string|null $method
60
     *
61
     * @return int Count of jobs pruned
62
     */
63 2
    public function pruneExceptionJobs($workerName = null, $method = null)
64
    {
65
        /** @var EntityManager $objectManager */
66 1
        $objectManager = $this->getObjectManager();
67 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
68 1
        $queryBuilder->where('j.status = :status')
69 1
            ->setParameter('status', BaseJob::STATUS_EXCEPTION);
70
71 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
72 2
        $query = $queryBuilder->getQuery();
73
74 1
        return intval($query->execute());
75
    }
76
77 21
    protected function resetSaveOk($function)
78
    {
79 21
        $objectManager = $this->getObjectManager();
80 21
        $splObjectHash = spl_object_hash($objectManager);
81
82 21
        if ('save' === $function) {
83
            $compare = static::$resetInsertCalled;
84
        } else {
85 21
            $compare = static::$saveInsertCalled;
86
        }
87
88 21
        if ($splObjectHash === $compare) {
89
            // Insert SQL is cached...
90
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
91
            throw new LogicException($msg);
92
        }
93
94 21
        if ('save' === $function) {
95
            static::$saveInsertCalled = spl_object_hash($objectManager);
96
        } else {
97 21
            static::$resetInsertCalled = spl_object_hash($objectManager);
98
        }
99 21
    }
100
101
    /**
102
     * @param string $workerName
103
     * @param string $method
104
     */
105 15
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
106
    {
107 15
        if (null !== $workerName) {
108 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter('workerName', $workerName);
109 5
        }
110
111 15
        if (null !== $method) {
112 4
            $queryBuilder->andWhere('j.method = :method')->setParameter('method', $method);
113 4
        }
114 15
    }
115
116 1
    protected function updateExpired($workerName = null, $method = null)
117
    {
118
        /** @var EntityManager $objectManager */
119 1
        $objectManager = $this->getObjectManager();
120 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
121 1
        $queryBuilder->set('j.status', ':newStatus');
122 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
123 1
            ->setParameter('expiresAt', Util::getMicrotimeDateTime());
124 1
        $queryBuilder->andWhere('j.status = :status')
125 1
            ->setParameter('status', BaseJob::STATUS_NEW)
126 1
            ->setParameter('newStatus', Job::STATUS_EXPIRED);
127
128 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
129 1
        $query = $queryBuilder->getQuery();
130
131 1
        return intval($query->execute());
132
    }
133
134
    /**
135
     * Removes archived jobs older than $olderThan.
136
     *
137
     * @param \DateTime $olderThan
138
     */
139 1
    public function pruneArchivedJobs(\DateTime $olderThan)
140
    {
141 1
        return $this->removeOlderThan(
142 1
            $this->getJobArchiveClass(),
143 1
                'updatedAt',
144
                $olderThan
145 1
        );
146
    }
147
148 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
149
    {
150
        /** @var EntityManager $objectManager */
151 2
        $objectManager = $this->getObjectManager();
152 2
        $queryBuilder = $objectManager->createQueryBuilder();
153
154 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
155
156 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
157 2
        $this->addStandardPredicate($queryBuilder);
158
159 2
        $query = $queryBuilder->getQuery();
160
161 2
        return $query->getSingleScalarResult();
162
    }
163
164
    /**
165
     * Get Jobs statuses.
166
     */
167 3
    public function getStatus()
168
    {
169 3
        $result = [];
170 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
171 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
172
173 3
        $finalResult = [];
174 3
        foreach ($result as $key => $item) {
175 1
            ksort($item);
176 1
            foreach ($item as $status => $count) {
177 1
                if (isset($finalResult[$key][$status])) {
178
                    $finalResult[$key][$status] += $count;
179
                } else {
180 1
                    $finalResult[$key][$status] = $count;
181
                }
182 1
            }
183 3
        }
184
185 3
        return $finalResult;
186
    }
187
188
    /**
189
     * @param string $entityName
190
     */
191 3
    protected function getStatusByEntityName($entityName, array &$result)
192
    {
193
        /** @var EntityManager $objectManager */
194 3
        $objectManager = $this->getObjectManager();
195 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
196 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
197
198 3
        foreach ($result1 as $item) {
199 1
            $method = $item['workerName'].'->'.$item['method'].'()';
200 1
            if (!isset($result[$method])) {
201 1
                $result[$method] = static::getAllStatuses();
202 1
            }
203 1
            $result[$method][$item['status']] += intval($item['c']);
204 3
        }
205 3
    }
206
207
    /**
208
     * Get the next job to run (can be filtered by workername and method name).
209
     *
210
     * @param string $workerName
211
     * @param string $methodName
212
     * @param bool   $prioritize
213
     * @param int    $runId
214
     *
215
     * @return Job|null
216
     */
217 12
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
218
    {
219
        do {
220 12
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
221 12
            $queryBuilder->select('j.id');
222 12
            $queryBuilder->setMaxResults(100);
223
224
            /** @var QueryBuilder $queryBuilder */
225 12
            $query = $queryBuilder->getQuery();
226 12
            $jobs = $query->getResult();
227 12
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
228 9
                foreach ($jobs as $job) {
229 9
                    if ($job = $this->takeJob($job['id'], $runId)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $job is correct as $this->takeJob($job['id'], $runId) (which targets Dtc\QueueBundle\ORM\JobManager::takeJob()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
230 9
                        return $job;
231
                    }
232
                }
233
            }
234 7
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
235
236 7
        return null;
237
    }
238
239
    /**
240
     * @param string|null $workerName
241
     * @param string|null $methodName
242
     * @param bool        $prioritize
243
     *
244
     * @return QueryBuilder
245
     */
246 12
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
247
    {
248
        /** @var EntityRepository $repository */
249 12
        $repository = $this->getRepository();
250 12
        $queryBuilder = $repository->createQueryBuilder('j');
251 12
        $this->addStandardPredicate($queryBuilder);
252 12
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
253
254 12
        if ($prioritize) {
255 12
            $queryBuilder->addOrderBy('j.priority', 'DESC');
256 12
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
257 12
        } else {
258 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
259
        }
260
261 12
        return $queryBuilder;
262
    }
263
264 15
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
265
    {
266 15
        $dateTime = Util::getMicrotimeDateTime();
267 15
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
268
269
        $queryBuilder
270 15
            ->where('j.status = :status')->setParameter('status', $status)
271 15
            ->andWhere($queryBuilder->expr()->orX(
272 15
                $queryBuilder->expr()->isNull('j.whenUs'),
273 15
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
274 15
            ))
275 15
            ->andWhere($queryBuilder->expr()->orX(
276 15
                $queryBuilder->expr()->isNull('j.expiresAt'),
277 15
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
278 15
            ))
279 15
            ->setParameter('whenUs', $decimal)
280 15
            ->setParameter('expiresAt', $dateTime);
281 15
    }
282
283
    /**
284
     * @param int $runId
285
     */
286 9
    protected function takeJob($jobId, $runId = null)
287
    {
288
        /** @var EntityRepository $repository */
289 9
        $repository = $this->getRepository();
290
        /** @var QueryBuilder $queryBuilder */
291 9
        $queryBuilder = $repository->createQueryBuilder('j');
292
        $queryBuilder
293 9
            ->update()
294 9
            ->set('j.status', ':status')
295 9
            ->setParameter('status', BaseJob::STATUS_RUNNING);
296 9
        if (null !== $runId) {
297
            $queryBuilder
298 1
                ->set('j.runId', ':runId')
299 1
                ->setParameter('runId', $runId);
300 1
        }
301 9
        $queryBuilder->set('j.startedAt', ':startedAt')
302 9
            ->setParameter('startedAt', Util::getMicrotimeDateTime());
303 9
        $queryBuilder->where('j.id = :id');
304 9
        $queryBuilder->setParameter('id', $jobId);
305 9
        $queryBuilder->andWhere('j.status = :statusNew')->setParameter('statusNew', BaseJob::STATUS_NEW);
306 9
        $resultCount = $queryBuilder->getQuery()->execute();
307
308 9
        if (1 === $resultCount) {
309 9
            return $repository->find($jobId);
310
        }
311
312
        return null;
313
    }
314
315
    /**
316
     * Tries to update the nearest job as a batch.
317
     *
318
     * @param \Dtc\QueueBundle\Model\Job $job
319
     *
320
     * @return null|Job
321
     */
322 2
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
323
    {
324 1
        if (!$job instanceof Job) {
325
            throw new UnsupportedException('$job must be instance of '.Job::class);
326
        }
327
328
        /** @var QueryBuilder $queryBuilder */
329 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
330 1
        $queryBuilder->select()
331 1
            ->where('j.crcHash = :crcHash')
332 1
            ->andWhere('j.status = :status')
333 1
            ->setParameter('status', BaseJob::STATUS_NEW)
334 2
            ->setParameter('crcHash', $job->getCrcHash())
335 1
            ->orderBy('j.whenUs', 'ASC')
336 1
            ->setMaxResults(1);
337 1
        $existingJobs = $queryBuilder->getQuery()->execute();
338
339 1
        if (empty($existingJobs)) {
340
            return null;
341
        }
342
343
        /** @var Job $existingJob */
344 1
        $existingJob = $existingJobs[0];
345
346 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
347 1
        $newWhenUs = $existingJob->getWhenUs();
348 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
349 1
        if ($bcResult < 0) {
350 1
            $newWhenUs = $job->getWhenUs();
351 1
        }
352
353 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
354
355 1
        return $existingJob;
356
    }
357
358
    /**
359
     * @param int    $newPriority
360
     * @param string $newWhenUs
361
     */
362 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
363
    {
364 1
        $existingPriority = $existingJob->getPriority();
365 1
        $existingWhenUs = $existingJob->getWhenUs();
366
367 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
368
            /** @var EntityRepository $repository */
369 1
            $repository = $this->getRepository();
370
            /** @var QueryBuilder $queryBuilder */
371 1
            $queryBuilder = $repository->createQueryBuilder('j');
372 1
            $queryBuilder->update();
373 1
            if ($newPriority !== $existingPriority) {
374 1
                $existingJob->setPriority($newPriority);
375 1
                $queryBuilder->set('j.priority', ':priority')
376 1
                    ->setParameter('priority', $newPriority);
377 1
            }
378 1
            if ($newWhenUs !== $existingWhenUs) {
379 1
                $existingJob->setWhenUs($newWhenUs);
380 1
                $queryBuilder->set('j.whenUs', ':whenUs')
381 1
                    ->setParameter('whenUs', $newWhenUs);
382 1
            }
383 1
            $queryBuilder->where('j.id = :id');
384 1
            $queryBuilder->setParameter('id', $existingJob->getId());
385 1
            $queryBuilder->getQuery()->execute();
386 1
        }
387
388 1
        return $existingJob;
389
    }
390
391 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
392
    {
393
        /** @var EntityRepository $repository */
394 2
        $repository = $this->getRepository();
395 2
        $queryBuilder = $repository->createQueryBuilder('j');
396 2
        $this->addStandardPredicate($queryBuilder, $status);
397
        $queryBuilder
398 2
            ->select('DISTINCT j.workerName, j.method');
399
400 2
        $results = $queryBuilder->getQuery()->getArrayResult();
401 2
        if (empty($results)) {
402 2
            return [];
403
        }
404
        $workerMethods = [];
405
        foreach ($results as $result) {
406
            $workerMethods[$result['workerName']][] = $result['method'];
407
        }
408
409
        return $workerMethods;
410
    }
411
412
    /**
413
     * @param string $workerName
414
     * @param string $methodName
415
     */
416 2
    public function countLiveJobs($workerName = null, $methodName = null)
417
    {
418
        /** @var EntityRepository $repository */
419 2
        $repository = $this->getRepository();
420 2
        $queryBuilder = $repository->createQueryBuilder('j');
421 2
        $this->addStandardPredicate($queryBuilder);
422 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
423 2
        $queryBuilder->select('count(j.id)');
424
425 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
426
    }
427
428
    /**
429
     * @param string        $workerName
430
     * @param string        $methodName
431
     * @param callable|null $progressCallback
432
     */
433 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
434
    {
435
        // First mark all Live non-running jobs as Archive
436 1
        $repository = $this->getRepository();
437
        /** @var QueryBuilder $queryBuilder */
438 1
        $queryBuilder = $repository->createQueryBuilder('j');
439 1
        $queryBuilder->update($this->getJobClass(), 'j')
440 1
            ->set('j.status', ':statusArchive')
441 1
            ->setParameter('statusArchive', Job::STATUS_ARCHIVE);
442 1
        $this->addStandardPredicate($queryBuilder);
443 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
444 1
        $resultCount = $queryBuilder->getQuery()->execute();
445
446 1
        if ($resultCount) {
447 1
            $this->runArchive($workerName, $methodName, $progressCallback);
448 1
        }
449 1
    }
450
451
    /**
452
     * Move jobs in 'archive' status to the archive table.
453
     *
454
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
455
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
456
     *
457
     * @param string|null   $workerName
458
     * @param string|null   $methodName
459
     * @param callable|null $progressCallback
460
     */
461 1
    protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
462
    {
463
        /** @var EntityManager $entityManager */
464 1
        $entityManager = $this->getObjectManager();
465 1
        $count = 0;
466
        do {
467
            /** @var EntityRepository $repository */
468 1
            $repository = $this->getRepository();
469 1
            $queryBuilder = $repository->createQueryBuilder('j');
470 1
            $queryBuilder->where('j.status = :status')
471 1
                ->setParameter('status', Job::STATUS_ARCHIVE)
472 1
                ->setMaxResults(10000);
473
474 1
            $results = $queryBuilder->getQuery()->getArrayResult();
475 1
            foreach ($results as $jobRow) {
476 1
                $job = $repository->find($jobRow['id']);
477 1
                if ($job) {
478 1
                    $entityManager->remove($job);
479 1
                }
480 1
                ++$count;
481 1
                if (0 == $count % 10) {
482
                    $this->flush();
483
                    $this->updateProgress($progressCallback, $count);
484
                }
485 1
            }
486 1
            $this->flush();
487 1
            $this->updateProgress($progressCallback, $count);
488 1
        } while (!empty($results) && 10000 == count($results));
489 1
    }
490
}
491