Completed
Pull Request — master (#40)
by Matthew
17:20
created

JobManager::addWorkerNameCriterion()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 4
nop 3
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
22
    {
23 3
        /** @var EntityManager $objectManager */
24
        $objectManager = $this->getObjectManager();
25
26 3
        $queryBuilder = $objectManager
27
            ->createQueryBuilder()
28
            ->select('count(a.id)')
29 3
            ->from($objectName, 'a')
30 3
            ->where('a.status = :status');
31 3
32 3
        if (null !== $workerName) {
33
            $queryBuilder->andWhere('a.workerName = :workerName')
34 3
                ->setParameter(':workerName', $workerName);
35 1
        }
36 1
37 1
        if (null !== $method) {
38
            $queryBuilder->andWhere('a.method = :method')
39 3
                ->setParameter(':method', $workerName);
40 1
        }
41 1
42 1
        $count = $queryBuilder->setParameter(':status', $status)
43
            ->getQuery()->getSingleScalarResult();
44 3
45 3
        if (!$count) {
46
            return 0;
47 3
        }
48 1
49
        return $count;
50
    }
51 3
52
    /**
53
     * @param string|null $workerName
54
     * @param string|null $method
55
     *
56
     * @return int Count of jobs pruned
57
     */
58
    public function pruneExceptionJobs($workerName = null, $method = null)
59
    {
60 1
        /** @var EntityManager $objectManager */
61
        $objectManager = $this->getObjectManager();
62
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
63 1
        $queryBuilder->where('j.status = :status')
64 1
            ->setParameter(':status', BaseJob::STATUS_EXCEPTION);
65 1
66 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
67
        $query = $queryBuilder->getQuery();
68 1
69 1
        return intval($query->execute());
70
    }
71 1
72 1
    protected function resetSaveOk($function)
73
    {
74 21
        $objectManager = $this->getObjectManager();
75
        $splObjectHash = spl_object_hash($objectManager);
76 21
77 21
        if ('save' === $function) {
78
            $compare = static::$resetInsertCalled;
79 21
        } else {
80
            $compare = static::$saveInsertCalled;
81
        }
82 21
83
        if ($splObjectHash === $compare) {
84
            // Insert SQL is cached...
85 21
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
86
            throw new LogicException($msg);
87
        }
88
89
        if ('save' === $function) {
90
            static::$saveInsertCalled = spl_object_hash($objectManager);
91 21
        } else {
92
            static::$resetInsertCalled = spl_object_hash($objectManager);
93
        }
94 21
    }
95
96 21
    /**
97
     * @param string $workerName
98
     * @param string $method
99
     */
100
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
101
    {
102 13
        if (null !== $workerName) {
103
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
104 13
        }
105 5
106 5
        if (null !== $method) {
107
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
108 13
        }
109 4
    }
110 4
111 13
    protected function updateExpired($workerName = null, $method = null)
112
    {
113 1
        /** @var EntityManager $objectManager */
114
        $objectManager = $this->getObjectManager();
115
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
116 1
        $queryBuilder->set('j.status', ':newStatus');
117 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
118 1
            ->setParameter(':expiresAt', Util::getMicrotimeDateTime());
119 1
        $queryBuilder->andWhere('j.status = :status')
120 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
121 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
122 1
123 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
124
        $query = $queryBuilder->getQuery();
125 1
126 1
        return intval($query->execute());
127
    }
128 1
129
    /**
130
     * Removes archived jobs older than $olderThan.
131
     *
132
     * @param \DateTime $olderThan
133
     */
134
    public function pruneArchivedJobs(\DateTime $olderThan)
135
    {
136 1
        return $this->removeOlderThan(
137
            $this->getJobArchiveClass(),
138 1
                'updatedAt',
139 1
                $olderThan
140 1
        );
141
    }
142 1
143 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
144
    {
145 2
        /** @var EntityManager $objectManager */
146
        $objectManager = $this->getObjectManager();
147
        $queryBuilder = $objectManager->createQueryBuilder();
148 2
149 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
150
151 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
152
        $this->addStandardPredicate($queryBuilder);
153 2
154 2
        $query = $queryBuilder->getQuery();
155
156
        return $query->getSingleScalarResult();
157
    }
158
159
    /**
160
     * Get Jobs statuses.
161
     */
162
    public function getStatus()
163
    {
164
        $result = [];
165
        $this->getStatusByEntityName($this->getJobClass(), $result);
166 2
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
167
168
        $finalResult = [];
169
        foreach ($result as $key => $item) {
170
            ksort($item);
171
            foreach ($item as $status => $count) {
172
                if (isset($finalResult[$key][$status])) {
173 2
                    $finalResult[$key][$status] += $count;
174 2
                } else {
175 2
                    $finalResult[$key][$status] = $count;
176 2
                }
177 2
            }
178 2
        }
179 2
180 2
        return $finalResult;
181 2
    }
182 2
183
    /**
184 2
     * @param string $entityName
185
     */
186 2
    protected function getStatusByEntityName($entityName, array &$result)
187
    {
188
        /** @var EntityManager $objectManager */
189
        $objectManager = $this->getObjectManager();
190
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
191
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
192 3
193
        foreach ($result1 as $item) {
194 3
            $method = $item['workerName'].'->'.$item['method'].'()';
195 3
            if (!isset($result[$method])) {
196 3
                $result[$method] = static::getAllStatuses();
197
            }
198 3
            $result[$method][$item['status']] += intval($item['c']);
199 3
        }
200 1
    }
201 1
202 1
    /**
203
     * Get the next job to run (can be filtered by workername and method name).
204
     *
205 1
     * @param string $workerName
206
     * @param string $methodName
207 1
     * @param bool   $prioritize
208 3
     * @param int    $runId
209
     *
210 3
     * @return Job|null
211
     */
212
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
213
    {
214
        do {
215
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
216 3
            $queryBuilder->select('j.id');
217
            $queryBuilder->setMaxResults(100);
218
219 3
            /** @var QueryBuilder $queryBuilder */
220 3
            $query = $queryBuilder->getQuery();
221 3
            $jobs = $query->getResult();
222
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
223 3
                foreach ($jobs as $job) {
224 1
                    if ($job = $this->takeJob($job['id'], $runId)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $job is correct as $this->takeJob($job['id'], $runId) (which targets Dtc\QueueBundle\ORM\JobManager::takeJob()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
225 1
                        return $job;
226 1
                    }
227 1
                }
228 1
            }
229 1
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
230 1
231 1
        return null;
232 1
    }
233 1
234 1
    /**
235 1
     * @param string|null $workerName
236 1
     * @param string|null $methodName
237 1
     * @param bool        $prioritize
238 1
     *
239 3
     * @return QueryBuilder
240 3
     */
241
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
242
    {
243
        /** @var EntityRepository $repository */
244
        $repository = $this->getRepository();
245
        $queryBuilder = $repository->createQueryBuilder('j');
246
        $this->addStandardPredicate($queryBuilder);
247
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
248
249
        if ($prioritize) {
250
            $queryBuilder->addOrderBy('j.priority', 'DESC');
251
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
252 11
        } else {
253
            $queryBuilder->orderBy('j.whenUs', 'ASC');
254
        }
255 11
256 11
        return $queryBuilder;
257 11
    }
258
259
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
260 11
    {
261 11
        $dateTime = Util::getMicrotimeDateTime();
262 11
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
263 9
264 9
        $queryBuilder
265 9
            ->where('j.status = :status')->setParameter(':status', $status)
266
            ->andWhere($queryBuilder->expr()->orX(
267
                $queryBuilder->expr()->isNull('j.whenUs'),
268
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
269 6
            ))
270
            ->andWhere($queryBuilder->expr()->orX(
271 6
                $queryBuilder->expr()->isNull('j.expiresAt'),
272
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
273
            ))
274
            ->setParameter(':whenUs', $decimal)
275
            ->setParameter(':expiresAt', $dateTime);
276
    }
277
278
    /**
279
     * @param int $runId
280
     */
281 11
    protected function takeJob($jobId, $runId = null)
282
    {
283
        /** @var EntityRepository $repository */
284 11
        $repository = $this->getRepository();
285 11
        /** @var QueryBuilder $queryBuilder */
286 11
        $queryBuilder = $repository->createQueryBuilder('j');
287 11
        $queryBuilder
288
            ->update()
289 11
            ->set('j.status', ':status')
290 11
            ->setParameter(':status', BaseJob::STATUS_RUNNING);
291 11
        if (null !== $runId) {
292 11
            $queryBuilder
293 1
                ->set('j.runId', ':runId')
294
                ->setParameter(':runId', $runId);
295
        }
296 11
        $queryBuilder->set('j.startedAt', ':startedAt')
297
            ->setParameter(':startedAt', Util::getMicrotimeDateTime());
298
        $queryBuilder->where('j.id = :id');
299 12
        $queryBuilder->setParameter(':id', $jobId);
300
        $resultCount = $queryBuilder->getQuery()->execute();
301 12
302 12
        if (1 === $resultCount) {
303
            return $repository->find($jobId);
304
        }
305 12
306 12
        return null;
307 12
    }
308 12
309 12
    /**
310 12
     * Tries to update the nearest job as a batch.
311 12
     *
312 12
     * @param \Dtc\QueueBundle\Model\Job $job
313 12
     *
314 12
     * @return null|Job
315 12
     */
316 12
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
317
    {
318
        if (!$job instanceof Job) {
319
            throw new UnsupportedException('$job must be instance of '.Job::class);
320
        }
321 9
322
        /** @var QueryBuilder $queryBuilder */
323
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
324 9
        $queryBuilder->select()
325
            ->where('j.crcHash = :crcHash')
326 9
            ->andWhere('j.status = :status')
327
            ->setParameter(':status', BaseJob::STATUS_NEW)
328 9
            ->setParameter(':crcHash', $job->getCrcHash())
329 9
            ->orderBy('j.whenUs', 'ASC')
330 9
            ->setMaxResults(1);
331 9
        $existingJobs = $queryBuilder->getQuery()->execute();
332
333 1
        if (empty($existingJobs)) {
334 2
            return null;
335 1
        }
336 9
337 9
        /** @var Job $existingJob */
338 9
        $existingJob = $existingJobs[0];
339 9
340 9
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
341
        $newWhenUs = $existingJob->getWhenUs();
342 9
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
343 9
        if ($bcResult < 0) {
344
            $newWhenUs = $job->getWhenUs();
345
        }
346
347
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
348
349
        return $existingJob;
350
    }
351
352
    /**
353
     * @param int    $newPriority
354
     * @param string $newWhenUs
355
     */
356 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
357
    {
358 1
        $existingPriority = $existingJob->getPriority();
359
        $existingWhenUs = $existingJob->getWhenUs();
360
361
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
362
            /** @var EntityRepository $repository */
363 1
            $repository = $this->getRepository();
364 1
            /** @var QueryBuilder $queryBuilder */
365 1
            $queryBuilder = $repository->createQueryBuilder('j');
366 1
            $queryBuilder->update();
367 1
            if ($newPriority !== $existingPriority) {
368 1
                $existingJob->setPriority($newPriority);
369 1
                $queryBuilder->set('j.priority', ':priority')
370 1
                    ->setParameter(':priority', $newPriority);
371 1
            }
372
            if ($newWhenUs !== $existingWhenUs) {
373 1
                $existingJob->setWhenUs($newWhenUs);
374
                $queryBuilder->set('j.whenUs', ':whenUs')
375
                    ->setParameter(':whenUs', $newWhenUs);
376
            }
377
            $queryBuilder->where('j.id = :id');
378 1
            $queryBuilder->setParameter(':id', $existingJob->getId());
379
            $queryBuilder->getQuery()->execute();
380 1
        }
381 1
382 1
        return $existingJob;
383 1
    }
384 1
385 1
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
386
    {
387 1
        /** @var EntityRepository $repository */
388
        $repository = $this->getRepository();
389 1
        $queryBuilder = $repository->createQueryBuilder('j');
390
        $this->addStandardPredicate($queryBuilder, $status);
391
        $queryBuilder
392
            ->select('DISTINCT j.workerName, j.method');
393
394
        $results = $queryBuilder->getQuery()->getArrayResult();
395
        if (empty($results)) {
396 1
            return [];
397
        }
398 1
        $workerMethods = [];
399 1
        foreach ($results as $result) {
400
            $workerMethods[$result['workerName']][] = $result['method'];
401 1
        }
402
403 1
        return $workerMethods;
404
    }
405 1
406 1
    /**
407 1
     * @param string $workerName
408 1
     * @param string $methodName
409 1
     */
410 1
    public function countLiveJobs($workerName = null, $methodName = null)
411 1
    {
412 1
        /** @var EntityRepository $repository */
413 1
        $repository = $this->getRepository();
414 1
        $queryBuilder = $repository->createQueryBuilder('j');
415 1
        $this->addStandardPredicate($queryBuilder);
416 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
417 1
        $queryBuilder->select('count(j.id)');
418 1
419 1
        return $queryBuilder->getQuery()->getSingleScalarResult();
420 1
    }
421
422 1
    /**
423
     * @param string        $workerName
424
     * @param string        $methodName
425 1
     * @param callable|null $progressCallback
426
     */
427
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
428 1
    {
429 1
        // First mark all Live non-running jobs as Archive
430 1
        $repository = $this->getRepository();
431
        /** @var QueryBuilder $queryBuilder */
432 1
        $queryBuilder = $repository->createQueryBuilder('j');
433
        $queryBuilder->update($this->getJobClass(), 'j')
434 1
            ->set('j.status', ':statusArchive')
435 1
            ->setParameter(':statusArchive', Job::STATUS_ARCHIVE);
436 1
        $this->addStandardPredicate($queryBuilder);
437
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
438
        $resultCount = $queryBuilder->getQuery()->execute();
439
440
        if ($resultCount) {
441
            $this->runArchive($workerName, $methodName, $progressCallback);
442
        }
443
    }
444
445
    /**
446
     * Move jobs in 'archive' status to the archive table.
447
     *
448
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
449
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
450 2
     *
451
     * @param string|null   $workerName
452
     * @param string|null   $methodName
453 2
     * @param callable|null $progressCallback
454 2
     */
455 2
    protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
456 2
    {
457 2
        /** @var EntityManager $entityManager */
458
        $entityManager = $this->getObjectManager();
459 2
        $count = 0;
460
        do {
461
            /** @var EntityRepository $repository */
462
            $repository = $this->getRepository();
463
            $queryBuilder = $repository->createQueryBuilder('j');
464
            $queryBuilder->where('j.status = :status')
465
                ->setParameter(':status', Job::STATUS_ARCHIVE)
466
                ->setMaxResults(10000);
467 1
468
            $results = $queryBuilder->getQuery()->getArrayResult();
469
            foreach ($results as $jobRow) {
470 1
                $job = $repository->find($jobRow['id']);
471
                if ($job) {
472 1
                    $entityManager->remove($job);
473 1
                }
474 1
                ++$count;
475 1
                if (0 == $count % 10) {
476 1
                    $this->flush();
477 1
                    $this->updateProgress($progressCallback, $count);
478 1
                }
479
            }
480 1
            $this->flush();
481 1
            $this->updateProgress($progressCallback, $count);
482 1
        } while (!empty($results) && 10000 == count($results));
483 1
    }
484
}
485