Completed
Pull Request — master (#53)
by Matthew
07:37
created

JobManager::addWorkerNameCriterion()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 4
nop 3
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 32
    public function getObjectManager()
22
    {
23 32
        $objectManager = parent::getObjectManager();
24
25 32
        return $this->getObjectManagerReset($objectManager);
0 ignored issues
show
Compatibility introduced by
$objectManager of type object<Doctrine\Common\Persistence\ObjectManager> is not a sub-type of object<Doctrine\ORM\EntityManager>. It seems like you assume a concrete implementation of the interface Doctrine\Common\Persistence\ObjectManager to be always present.

This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.

Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.

Loading history...
26
    }
27
28 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
29
    {
30
        /** @var EntityManager $objectManager */
31 3
        $objectManager = $this->getObjectManager();
32
33
        $queryBuilder = $objectManager
34 3
            ->createQueryBuilder()
35 3
            ->select('count(a.id)')
36 3
            ->from($objectName, 'a')
37 3
            ->where('a.status = :status');
38
39 3
        if (null !== $workerName) {
40 1
            $queryBuilder->andWhere('a.workerName = :workerName')
41 1
                ->setParameter(':workerName', $workerName);
42
        }
43
44 3
        if (null !== $method) {
45 1
            $queryBuilder->andWhere('a.method = :method')
46 1
                ->setParameter(':method', $workerName);
47
        }
48
49 3
        $count = $queryBuilder->setParameter(':status', $status)
50 3
            ->getQuery()->getSingleScalarResult();
51
52 3
        if (!$count) {
53 1
            return 0;
54
        }
55
56 3
        return $count;
57
    }
58
59
    /**
60
     * @param string|null $workerName
61
     * @param string|null $method
62
     *
63
     * @return int Count of jobs pruned
64
     */
65 1
    public function pruneExceptionJobs($workerName = null, $method = null)
66
    {
67
        /** @var EntityManager $objectManager */
68 1
        $objectManager = $this->getObjectManager();
69 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
70 1
        $queryBuilder->where('j.status = :status')
71 1
            ->setParameter(':status', BaseJob::STATUS_EXCEPTION);
72
73 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
74 1
        $query = $queryBuilder->getQuery();
75
76 1
        return intval($query->execute());
77
    }
78
79 21
    protected function resetSaveOk($function)
80
    {
81 21
        $objectManager = $this->getObjectManager();
82 21
        $splObjectHash = spl_object_hash($objectManager);
83
84 21
        if ('save' === $function) {
85
            $compare = static::$resetInsertCalled;
86
        } else {
87 21
            $compare = static::$saveInsertCalled;
88
        }
89
90 21
        if ($splObjectHash === $compare) {
91
            // Insert SQL is cached...
92
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
93
            throw new LogicException($msg);
94
        }
95
96 21
        if ('save' === $function) {
97
            static::$saveInsertCalled = spl_object_hash($objectManager);
98
        } else {
99 21
            static::$resetInsertCalled = spl_object_hash($objectManager);
100
        }
101 21
    }
102
103
    /**
104
     * @param string $workerName
105
     * @param string $method
106
     */
107 15
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
108
    {
109 15
        if (null !== $workerName) {
110 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
111
        }
112
113 15
        if (null !== $method) {
114 4
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
115
        }
116 15
    }
117
118 1
    protected function updateExpired($workerName = null, $method = null)
119
    {
120
        /** @var EntityManager $objectManager */
121 1
        $objectManager = $this->getObjectManager();
122 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
123 1
        $queryBuilder->set('j.status', ':newStatus');
124 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
125 1
            ->setParameter(':expiresAt', Util::getMicrotimeDateTime());
126 1
        $queryBuilder->andWhere('j.status = :status')
127 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
128 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
129
130 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
131 1
        $query = $queryBuilder->getQuery();
132
133 1
        return intval($query->execute());
134
    }
135
136
    /**
137
     * Removes archived jobs older than $olderThan.
138
     *
139
     * @param \DateTime $olderThan
140
     */
141 1
    public function pruneArchivedJobs(\DateTime $olderThan)
142
    {
143 1
        return $this->removeOlderThan(
144 1
            $this->getJobArchiveClass(),
145 1
                'updatedAt',
146 1
                $olderThan
147
        );
148
    }
149
150 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
151
    {
152
        /** @var EntityManager $objectManager */
153 2
        $objectManager = $this->getObjectManager();
154 2
        $queryBuilder = $objectManager->createQueryBuilder();
155
156 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
157
158 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
159 2
        $this->addStandardPredicate($queryBuilder);
160
161 2
        $query = $queryBuilder->getQuery();
162
163 2
        return $query->getSingleScalarResult();
164
    }
165
166
    /**
167
     * Get Jobs statuses.
168
     */
169 3
    public function getStatus()
170
    {
171 3
        $result = [];
172 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
173 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
174
175 3
        $finalResult = [];
176 3
        foreach ($result as $key => $item) {
177 1
            ksort($item);
178 1
            foreach ($item as $status => $count) {
179 1
                if (isset($finalResult[$key][$status])) {
180
                    $finalResult[$key][$status] += $count;
181
                } else {
182 1
                    $finalResult[$key][$status] = $count;
183
                }
184
            }
185
        }
186
187 3
        return $finalResult;
188
    }
189
190
    /**
191
     * @param string $entityName
192
     */
193 3
    protected function getStatusByEntityName($entityName, array &$result)
194
    {
195
        /** @var EntityManager $objectManager */
196 3
        $objectManager = $this->getObjectManager();
197 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
198 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
199
200 3
        foreach ($result1 as $item) {
201 1
            $method = $item['workerName'].'->'.$item['method'].'()';
202 1
            if (!isset($result[$method])) {
203 1
                $result[$method] = static::getAllStatuses();
204
            }
205 1
            $result[$method][$item['status']] += intval($item['c']);
206
        }
207 3
    }
208
209
    /**
210
     * Get the next job to run (can be filtered by workername and method name).
211
     *
212
     * @param string $workerName
213
     * @param string $methodName
214
     * @param bool   $prioritize
215
     * @param int    $runId
216
     *
217
     * @return Job|null
218
     */
219 12
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
220
    {
221
        do {
222 12
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
223 12
            $queryBuilder->select('j.id');
224 12
            $queryBuilder->setMaxResults(100);
225
226
            /** @var QueryBuilder $queryBuilder */
227 12
            $query = $queryBuilder->getQuery();
228 12
            $jobs = $query->getResult();
229 12
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
230 9
                foreach ($jobs as $job) {
231 9
                    if ($job = $this->takeJob($job['id'], $runId)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $job is correct as $this->takeJob($job['id'], $runId) (which targets Dtc\QueueBundle\ORM\JobManager::takeJob()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
232 9
                        return $job;
233
                    }
234
                }
235
            }
236 7
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
237
238 7
        return null;
239
    }
240
241
    /**
242
     * @param string|null $workerName
243
     * @param string|null $methodName
244
     * @param bool        $prioritize
245
     *
246
     * @return QueryBuilder
247
     */
248 12
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
249
    {
250
        /** @var EntityRepository $repository */
251 12
        $repository = $this->getRepository();
252 12
        $queryBuilder = $repository->createQueryBuilder('j');
253 12
        $this->addStandardPredicate($queryBuilder);
254 12
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
255
256 12
        if ($prioritize) {
257 12
            $queryBuilder->addOrderBy('j.priority', 'DESC');
258 12
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
259
        } else {
260 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
261
        }
262
263 12
        return $queryBuilder;
264
    }
265
266 15
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
267
    {
268 15
        $dateTime = Util::getMicrotimeDateTime();
269 15
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
270
271
        $queryBuilder
272 15
            ->where('j.status = :status')->setParameter(':status', $status)
273 15
            ->andWhere($queryBuilder->expr()->orX(
274 15
                $queryBuilder->expr()->isNull('j.whenUs'),
275 15
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
276
            ))
277 15
            ->andWhere($queryBuilder->expr()->orX(
278 15
                $queryBuilder->expr()->isNull('j.expiresAt'),
279 15
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
280
            ))
281 15
            ->setParameter(':whenUs', $decimal)
282 15
            ->setParameter(':expiresAt', $dateTime);
283 15
    }
284
285
    /**
286
     * @param int $runId
287
     */
288 9
    protected function takeJob($jobId, $runId = null)
289
    {
290
        /** @var EntityRepository $repository */
291 9
        $repository = $this->getRepository();
292
        /** @var QueryBuilder $queryBuilder */
293 9
        $queryBuilder = $repository->createQueryBuilder('j');
294
        $queryBuilder
295 9
            ->update()
296 9
            ->set('j.status', ':status')
297 9
            ->setParameter(':status', BaseJob::STATUS_RUNNING);
298 9
        if (null !== $runId) {
299
            $queryBuilder
300 1
                ->set('j.runId', ':runId')
301 1
                ->setParameter(':runId', $runId);
302
        }
303 9
        $queryBuilder->set('j.startedAt', ':startedAt')
304 9
            ->setParameter(':startedAt', Util::getMicrotimeDateTime());
305 9
        $queryBuilder->where('j.id = :id');
306 9
        $queryBuilder->setParameter(':id', $jobId);
307 9
        $resultCount = $queryBuilder->getQuery()->execute();
308
309 9
        if (1 === $resultCount) {
310 9
            return $repository->find($jobId);
311
        }
312
313
        return null;
314
    }
315
316
    /**
317
     * Tries to update the nearest job as a batch.
318
     *
319
     * @param \Dtc\QueueBundle\Model\Job $job
320
     *
321
     * @return null|Job
322
     */
323 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
324
    {
325 1
        if (!$job instanceof Job) {
326
            throw new UnsupportedException('$job must be instance of '.Job::class);
327
        }
328
329
        /** @var QueryBuilder $queryBuilder */
330 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
331 1
        $queryBuilder->select()
332 1
            ->where('j.crcHash = :crcHash')
333 1
            ->andWhere('j.status = :status')
334 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
335 1
            ->setParameter(':crcHash', $job->getCrcHash())
336 1
            ->orderBy('j.whenUs', 'ASC')
337 1
            ->setMaxResults(1);
338 1
        $existingJobs = $queryBuilder->getQuery()->execute();
339
340 1
        if (empty($existingJobs)) {
341
            return null;
342
        }
343
344
        /** @var Job $existingJob */
345 1
        $existingJob = $existingJobs[0];
346
347 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
348 1
        $newWhenUs = $existingJob->getWhenUs();
349 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
350 1
        if ($bcResult < 0) {
351 1
            $newWhenUs = $job->getWhenUs();
352
        }
353
354 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
355
356 1
        return $existingJob;
357
    }
358
359
    /**
360
     * @param int    $newPriority
361
     * @param string $newWhenUs
362
     */
363 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
364
    {
365 1
        $existingPriority = $existingJob->getPriority();
366 1
        $existingWhenUs = $existingJob->getWhenUs();
367
368 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
369
            /** @var EntityRepository $repository */
370 1
            $repository = $this->getRepository();
371
            /** @var QueryBuilder $queryBuilder */
372 1
            $queryBuilder = $repository->createQueryBuilder('j');
373 1
            $queryBuilder->update();
374 1
            if ($newPriority !== $existingPriority) {
375 1
                $existingJob->setPriority($newPriority);
376 1
                $queryBuilder->set('j.priority', ':priority')
377 1
                    ->setParameter(':priority', $newPriority);
378
            }
379 1
            if ($newWhenUs !== $existingWhenUs) {
380 1
                $existingJob->setWhenUs($newWhenUs);
381 1
                $queryBuilder->set('j.whenUs', ':whenUs')
382 1
                    ->setParameter(':whenUs', $newWhenUs);
383
            }
384 1
            $queryBuilder->where('j.id = :id');
385 1
            $queryBuilder->setParameter(':id', $existingJob->getId());
386 1
            $queryBuilder->getQuery()->execute();
387
        }
388
389 1
        return $existingJob;
390
    }
391
392 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
393
    {
394
        /** @var EntityRepository $repository */
395 2
        $repository = $this->getRepository();
396 2
        $queryBuilder = $repository->createQueryBuilder('j');
397 2
        $this->addStandardPredicate($queryBuilder, $status);
398
        $queryBuilder
399 2
            ->select('DISTINCT j.workerName, j.method');
400
401 2
        $results = $queryBuilder->getQuery()->getArrayResult();
402 2
        if (empty($results)) {
403 2
            return [];
404
        }
405
        $workerMethods = [];
406
        foreach ($results as $result) {
407
            $workerMethods[$result['workerName']][] = $result['method'];
408
        }
409
410
        return $workerMethods;
411
    }
412
413
    /**
414
     * @param string $workerName
415
     * @param string $methodName
416
     */
417 2
    public function countLiveJobs($workerName = null, $methodName = null)
418
    {
419
        /** @var EntityRepository $repository */
420 2
        $repository = $this->getRepository();
421 2
        $queryBuilder = $repository->createQueryBuilder('j');
422 2
        $this->addStandardPredicate($queryBuilder);
423 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
424 2
        $queryBuilder->select('count(j.id)');
425
426 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
427
    }
428
429
    /**
430
     * @param string        $workerName
431
     * @param string        $methodName
432
     * @param callable|null $progressCallback
433
     */
434 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
435
    {
436
        // First mark all Live non-running jobs as Archive
437 1
        $repository = $this->getRepository();
438
        /** @var QueryBuilder $queryBuilder */
439 1
        $queryBuilder = $repository->createQueryBuilder('j');
440 1
        $queryBuilder->update($this->getJobClass(), 'j')
441 1
            ->set('j.status', ':statusArchive')
442 1
            ->setParameter(':statusArchive', Job::STATUS_ARCHIVE);
443 1
        $this->addStandardPredicate($queryBuilder);
444 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
445 1
        $resultCount = $queryBuilder->getQuery()->execute();
446
447 1
        if ($resultCount) {
448 1
            $this->runArchive($workerName, $methodName, $progressCallback);
449
        }
450 1
    }
451
452
    /**
453
     * Move jobs in 'archive' status to the archive table.
454
     *
455
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
456
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
457
     *
458
     * @param string|null   $workerName
459
     * @param string|null   $methodName
460
     * @param callable|null $progressCallback
461
     */
462 1
    protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
463
    {
464
        /** @var EntityManager $entityManager */
465 1
        $entityManager = $this->getObjectManager();
466 1
        $count = 0;
467
        do {
468
            /** @var EntityRepository $repository */
469 1
            $repository = $this->getRepository();
470 1
            $queryBuilder = $repository->createQueryBuilder('j');
471 1
            $queryBuilder->where('j.status = :status')
472 1
                ->setParameter(':status', Job::STATUS_ARCHIVE)
473 1
                ->setMaxResults(10000);
474
475 1
            $results = $queryBuilder->getQuery()->getArrayResult();
476 1
            foreach ($results as $jobRow) {
477 1
                $job = $repository->find($jobRow['id']);
478 1
                if ($job) {
479 1
                    $entityManager->remove($job);
480
                }
481 1
                ++$count;
482 1
                if (0 == $count % 10) {
483
                    $this->flush();
484 1
                    $this->updateProgress($progressCallback, $count);
485
                }
486
            }
487 1
            $this->flush();
488 1
            $this->updateProgress($progressCallback, $count);
489 1
        } while (!empty($results) && 10000 == count($results));
490 1
    }
491
}
492