Completed
Pull Request — master (#53)
by Matthew
08:16
created

JobManager::addWorkerNameCriterion()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 4
nop 3
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 32
    public function getObjectManager()
22
    {
23 32
        return $this->getObjectManagerReset();
24
    }
25
26 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
27
    {
28
        /** @var EntityManager $objectManager */
29 3
        $objectManager = $this->getObjectManager();
30
31
        $queryBuilder = $objectManager
32 3
            ->createQueryBuilder()
33 3
            ->select('count(a.id)')
34 3
            ->from($objectName, 'a')
35 3
            ->where('a.status = :status');
36
37 3
        if (null !== $workerName) {
38 1
            $queryBuilder->andWhere('a.workerName = :workerName')
39 1
                ->setParameter(':workerName', $workerName);
40
        }
41
42 3
        if (null !== $method) {
43 1
            $queryBuilder->andWhere('a.method = :method')
44 1
                ->setParameter(':method', $workerName);
45
        }
46
47 3
        $count = $queryBuilder->setParameter(':status', $status)
48 3
            ->getQuery()->getSingleScalarResult();
49
50 3
        if (!$count) {
51 1
            return 0;
52
        }
53
54 3
        return $count;
55
    }
56
57
    /**
58
     * @param string|null $workerName
59
     * @param string|null $method
60
     *
61
     * @return int Count of jobs pruned
62
     */
63 1
    public function pruneExceptionJobs($workerName = null, $method = null)
64
    {
65
        /** @var EntityManager $objectManager */
66 1
        $objectManager = $this->getObjectManager();
67 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
68 1
        $queryBuilder->where('j.status = :status')
69 1
            ->setParameter(':status', BaseJob::STATUS_EXCEPTION);
70
71 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
72 1
        $query = $queryBuilder->getQuery();
73
74 1
        return intval($query->execute());
75
    }
76
77 21
    protected function resetSaveOk($function)
78
    {
79 21
        $objectManager = $this->getObjectManager();
80 21
        $splObjectHash = spl_object_hash($objectManager);
81
82 21
        if ('save' === $function) {
83
            $compare = static::$resetInsertCalled;
84
        } else {
85 21
            $compare = static::$saveInsertCalled;
86
        }
87
88 21
        if ($splObjectHash === $compare) {
89
            // Insert SQL is cached...
90
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
91
            throw new LogicException($msg);
92
        }
93
94 21
        if ('save' === $function) {
95
            static::$saveInsertCalled = spl_object_hash($objectManager);
96
        } else {
97 21
            static::$resetInsertCalled = spl_object_hash($objectManager);
98
        }
99 21
    }
100
101
    /**
102
     * @param string $workerName
103
     * @param string $method
104
     */
105 15
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
106
    {
107 15
        if (null !== $workerName) {
108 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
109
        }
110
111 15
        if (null !== $method) {
112 4
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
113
        }
114 15
    }
115
116 1
    protected function updateExpired($workerName = null, $method = null)
117
    {
118
        /** @var EntityManager $objectManager */
119 1
        $objectManager = $this->getObjectManager();
120 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
121 1
        $queryBuilder->set('j.status', ':newStatus');
122 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
123 1
            ->setParameter(':expiresAt', Util::getMicrotimeDateTime());
124 1
        $queryBuilder->andWhere('j.status = :status')
125 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
126 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
127
128 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
129 1
        $query = $queryBuilder->getQuery();
130
131 1
        return intval($query->execute());
132
    }
133
134
    /**
135
     * Removes archived jobs older than $olderThan.
136
     *
137
     * @param \DateTime $olderThan
138
     */
139 1
    public function pruneArchivedJobs(\DateTime $olderThan)
140
    {
141 1
        return $this->removeOlderThan(
142 1
            $this->getJobArchiveClass(),
143 1
                'updatedAt',
144 1
                $olderThan
145
        );
146
    }
147
148 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
149
    {
150
        /** @var EntityManager $objectManager */
151 2
        $objectManager = $this->getObjectManager();
152 2
        $queryBuilder = $objectManager->createQueryBuilder();
153
154 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
155
156 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
157 2
        $this->addStandardPredicate($queryBuilder);
158
159 2
        $query = $queryBuilder->getQuery();
160
161 2
        return $query->getSingleScalarResult();
162
    }
163
164
    /**
165
     * Get Jobs statuses.
166
     */
167 3
    public function getStatus()
168
    {
169 3
        $result = [];
170 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
171 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
172
173 3
        $finalResult = [];
174 3
        foreach ($result as $key => $item) {
175 1
            ksort($item);
176 1
            foreach ($item as $status => $count) {
177 1
                if (isset($finalResult[$key][$status])) {
178
                    $finalResult[$key][$status] += $count;
179
                } else {
180 1
                    $finalResult[$key][$status] = $count;
181
                }
182
            }
183
        }
184
185 3
        return $finalResult;
186
    }
187
188
    /**
189
     * @param string $entityName
190
     */
191 3
    protected function getStatusByEntityName($entityName, array &$result)
192
    {
193
        /** @var EntityManager $objectManager */
194 3
        $objectManager = $this->getObjectManager();
195 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
196 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
197
198 3
        foreach ($result1 as $item) {
199 1
            $method = $item['workerName'].'->'.$item['method'].'()';
200 1
            if (!isset($result[$method])) {
201 1
                $result[$method] = static::getAllStatuses();
202
            }
203 1
            $result[$method][$item['status']] += intval($item['c']);
204
        }
205 3
    }
206
207
    /**
208
     * Get the next job to run (can be filtered by workername and method name).
209
     *
210
     * @param string $workerName
211
     * @param string $methodName
212
     * @param bool   $prioritize
213
     * @param int    $runId
214
     *
215
     * @return Job|null
216
     */
217 12
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
218
    {
219
        do {
220 12
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
221 12
            $queryBuilder->select('j.id');
222 12
            $queryBuilder->setMaxResults(100);
223
224
            /** @var QueryBuilder $queryBuilder */
225 12
            $query = $queryBuilder->getQuery();
226 12
            $jobs = $query->getResult();
227 12
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
228 9
                foreach ($jobs as $job) {
229 9
                    if ($job = $this->takeJob($job['id'], $runId)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $job is correct as $this->takeJob($job['id'], $runId) (which targets Dtc\QueueBundle\ORM\JobManager::takeJob()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
230 9
                        return $job;
231
                    }
232
                }
233
            }
234 7
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
235
236 7
        return null;
237
    }
238
239
    /**
240
     * @param string|null $workerName
241
     * @param string|null $methodName
242
     * @param bool        $prioritize
243
     *
244
     * @return QueryBuilder
245
     */
246 12
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
247
    {
248
        /** @var EntityRepository $repository */
249 12
        $repository = $this->getRepository();
250 12
        $queryBuilder = $repository->createQueryBuilder('j');
251 12
        $this->addStandardPredicate($queryBuilder);
252 12
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
253
254 12
        if ($prioritize) {
255 12
            $queryBuilder->addOrderBy('j.priority', 'DESC');
256 12
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
257
        } else {
258 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
259
        }
260
261 12
        return $queryBuilder;
262
    }
263
264 15
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
265
    {
266 15
        $dateTime = Util::getMicrotimeDateTime();
267 15
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
268
269
        $queryBuilder
270 15
            ->where('j.status = :status')->setParameter(':status', $status)
271 15
            ->andWhere($queryBuilder->expr()->orX(
272 15
                $queryBuilder->expr()->isNull('j.whenUs'),
273 15
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
274
            ))
275 15
            ->andWhere($queryBuilder->expr()->orX(
276 15
                $queryBuilder->expr()->isNull('j.expiresAt'),
277 15
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
278
            ))
279 15
            ->setParameter(':whenUs', $decimal)
280 15
            ->setParameter(':expiresAt', $dateTime);
281 15
    }
282
283
    /**
284
     * @param int $runId
285
     */
286 9
    protected function takeJob($jobId, $runId = null)
287
    {
288
        /** @var EntityRepository $repository */
289 9
        $repository = $this->getRepository();
290
        /** @var QueryBuilder $queryBuilder */
291 9
        $queryBuilder = $repository->createQueryBuilder('j');
292
        $queryBuilder
293 9
            ->update()
294 9
            ->set('j.status', ':status')
295 9
            ->setParameter(':status', BaseJob::STATUS_RUNNING);
296 9
        if (null !== $runId) {
297
            $queryBuilder
298 1
                ->set('j.runId', ':runId')
299 1
                ->setParameter(':runId', $runId);
300
        }
301 9
        $queryBuilder->set('j.startedAt', ':startedAt')
302 9
            ->setParameter(':startedAt', Util::getMicrotimeDateTime());
303 9
        $queryBuilder->where('j.id = :id');
304 9
        $queryBuilder->setParameter(':id', $jobId);
305 9
        $resultCount = $queryBuilder->getQuery()->execute();
306
307 9
        if (1 === $resultCount) {
308 9
            return $repository->find($jobId);
309
        }
310
311
        return null;
312
    }
313
314
    /**
315
     * Tries to update the nearest job as a batch.
316
     *
317
     * @param \Dtc\QueueBundle\Model\Job $job
318
     *
319
     * @return null|Job
320
     */
321 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
322
    {
323 1
        if (!$job instanceof Job) {
324
            throw new UnsupportedException('$job must be instance of '.Job::class);
325
        }
326
327
        /** @var QueryBuilder $queryBuilder */
328 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
329 1
        $queryBuilder->select()
330 1
            ->where('j.crcHash = :crcHash')
331 1
            ->andWhere('j.status = :status')
332 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
333 1
            ->setParameter(':crcHash', $job->getCrcHash())
334 1
            ->orderBy('j.whenUs', 'ASC')
335 1
            ->setMaxResults(1);
336 1
        $existingJobs = $queryBuilder->getQuery()->execute();
337
338 1
        if (empty($existingJobs)) {
339
            return null;
340
        }
341
342
        /** @var Job $existingJob */
343 1
        $existingJob = $existingJobs[0];
344
345 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
346 1
        $newWhenUs = $existingJob->getWhenUs();
347 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
348 1
        if ($bcResult < 0) {
349 1
            $newWhenUs = $job->getWhenUs();
350
        }
351
352 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
353
354 1
        return $existingJob;
355
    }
356
357
    /**
358
     * @param int    $newPriority
359
     * @param string $newWhenUs
360
     */
361 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
362
    {
363 1
        $existingPriority = $existingJob->getPriority();
364 1
        $existingWhenUs = $existingJob->getWhenUs();
365
366 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
367
            /** @var EntityRepository $repository */
368 1
            $repository = $this->getRepository();
369
            /** @var QueryBuilder $queryBuilder */
370 1
            $queryBuilder = $repository->createQueryBuilder('j');
371 1
            $queryBuilder->update();
372 1
            if ($newPriority !== $existingPriority) {
373 1
                $existingJob->setPriority($newPriority);
374 1
                $queryBuilder->set('j.priority', ':priority')
375 1
                    ->setParameter(':priority', $newPriority);
376
            }
377 1
            if ($newWhenUs !== $existingWhenUs) {
378 1
                $existingJob->setWhenUs($newWhenUs);
379 1
                $queryBuilder->set('j.whenUs', ':whenUs')
380 1
                    ->setParameter(':whenUs', $newWhenUs);
381
            }
382 1
            $queryBuilder->where('j.id = :id');
383 1
            $queryBuilder->setParameter(':id', $existingJob->getId());
384 1
            $queryBuilder->getQuery()->execute();
385
        }
386
387 1
        return $existingJob;
388
    }
389
390 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
391
    {
392
        /** @var EntityRepository $repository */
393 2
        $repository = $this->getRepository();
394 2
        $queryBuilder = $repository->createQueryBuilder('j');
395 2
        $this->addStandardPredicate($queryBuilder, $status);
396
        $queryBuilder
397 2
            ->select('DISTINCT j.workerName, j.method');
398
399 2
        $results = $queryBuilder->getQuery()->getArrayResult();
400 2
        if (empty($results)) {
401 2
            return [];
402
        }
403
        $workerMethods = [];
404
        foreach ($results as $result) {
405
            $workerMethods[$result['workerName']][] = $result['method'];
406
        }
407
408
        return $workerMethods;
409
    }
410
411
    /**
412
     * @param string $workerName
413
     * @param string $methodName
414
     */
415 2
    public function countLiveJobs($workerName = null, $methodName = null)
416
    {
417
        /** @var EntityRepository $repository */
418 2
        $repository = $this->getRepository();
419 2
        $queryBuilder = $repository->createQueryBuilder('j');
420 2
        $this->addStandardPredicate($queryBuilder);
421 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
422 2
        $queryBuilder->select('count(j.id)');
423
424 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
425
    }
426
427
    /**
428
     * @param string        $workerName
429
     * @param string        $methodName
430
     * @param callable|null $progressCallback
431
     */
432 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
433
    {
434
        // First mark all Live non-running jobs as Archive
435 1
        $repository = $this->getRepository();
436
        /** @var QueryBuilder $queryBuilder */
437 1
        $queryBuilder = $repository->createQueryBuilder('j');
438 1
        $queryBuilder->update($this->getJobClass(), 'j')
439 1
            ->set('j.status', ':statusArchive')
440 1
            ->setParameter(':statusArchive', Job::STATUS_ARCHIVE);
441 1
        $this->addStandardPredicate($queryBuilder);
442 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
443 1
        $resultCount = $queryBuilder->getQuery()->execute();
444
445 1
        if ($resultCount) {
446 1
            $this->runArchive($workerName, $methodName, $progressCallback);
447
        }
448 1
    }
449
450
    /**
451
     * Move jobs in 'archive' status to the archive table.
452
     *
453
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
454
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
455
     *
456
     * @param string|null   $workerName
457
     * @param string|null   $methodName
458
     * @param callable|null $progressCallback
459
     */
460 1
    protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
461
    {
462
        /** @var EntityManager $entityManager */
463 1
        $entityManager = $this->getObjectManager();
464 1
        $count = 0;
465
        do {
466
            /** @var EntityRepository $repository */
467 1
            $repository = $this->getRepository();
468 1
            $queryBuilder = $repository->createQueryBuilder('j');
469 1
            $queryBuilder->where('j.status = :status')
470 1
                ->setParameter(':status', Job::STATUS_ARCHIVE)
471 1
                ->setMaxResults(10000);
472
473 1
            $results = $queryBuilder->getQuery()->getArrayResult();
474 1
            foreach ($results as $jobRow) {
475 1
                $job = $repository->find($jobRow['id']);
476 1
                if ($job) {
477 1
                    $entityManager->remove($job);
478
                }
479 1
                ++$count;
480 1
                if (0 == $count % 10) {
481
                    $this->flush();
482 1
                    $this->updateProgress($progressCallback, $count);
483
                }
484
            }
485 1
            $this->flush();
486 1
            $this->updateProgress($progressCallback, $count);
487 1
        } while (!empty($results) && 10000 == count($results));
488 1
    }
489
}
490