Passed
Push — master ( 82017d...f35517 )
by Matthew
08:19
created

JobManager   C

Complexity

Total Complexity 53

Size/Duplication

Total Lines 454
Duplicated Lines 0 %

Test Coverage

Coverage 95.22%

Importance

Changes 10
Bugs 4 Features 1
Metric Value
eloc 220
c 10
b 4
f 1
dl 0
loc 454
ccs 219
cts 230
cp 0.9522
rs 6.96
wmc 53

20 Methods

Rating   Name   Duplication   Size   Complexity  
A getObjectManager() 0 3 1
A pruneExceptionJobs() 0 12 1
A updateExpired() 0 16 1
A countJobsByStatus() 0 29 4
A addWorkerNameCriterion() 0 8 3
A countLiveJobs() 0 10 1
A updateBatchJob() 0 27 5
A archiveAllJobs() 0 15 2
A getStatus() 0 19 4
A getWaitingJobCount() 0 14 1
A getStatusByEntityName() 0 13 3
A takeJob() 0 27 3
A getJob() 0 20 5
A getWorkersAndMethods() 0 19 3
A updateNearestBatch() 0 34 4
A addStandardPredicates() 0 17 1
A getJobQueryBuilder() 0 16 2
A findRefresh() 0 11 2
A pruneArchivedJobs() 0 6 1
B runArchive() 0 28 6

How to fix   Complexity   

Complex Class

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
14
class JobManager extends DoctrineJobManager
15
{
16
    use CommonTrait;
0 ignored issues
show
introduced by
The trait Dtc\QueueBundle\ORM\CommonTrait requires some properties which are not provided by Dtc\QueueBundle\ORM\JobManager: $generatorType, $idGenerator
Loading history...
17
    protected static $saveInsertCalled = null;
18
    protected static $resetInsertCalled = null;
19
20 31
    public function getObjectManager()
21
    {
22 31
        return $this->getObjectManagerReset();
23
    }
24
25 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
26
    {
27
        /** @var EntityManager $objectManager */
28 3
        $objectManager = $this->getObjectManager();
29
30
        $queryBuilder = $objectManager
31 3
            ->createQueryBuilder()
32 3
            ->select('count(a.id)')
33 3
            ->from($objectName, 'a')
34 3
            ->where('a.status = :status');
35
36 3
        if (null !== $workerName) {
37 1
            $queryBuilder->andWhere('a.workerName = :workerName')
38 1
                ->setParameter('workerName', $workerName);
39
        }
40
41 3
        if (null !== $method) {
42 1
            $queryBuilder->andWhere('a.method = :method')
43 1
                ->setParameter('method', $workerName);
44
        }
45
46 3
        $count = $queryBuilder->setParameter('status', $status)
47 3
            ->getQuery()->getSingleScalarResult();
48
49 3
        if (!$count) {
50 1
            return 0;
51
        }
52
53 3
        return $count;
54
    }
55
56
    /**
57
     * @param string|null $workerName
58
     * @param string|null $method
59
     *
60
     * @return int Count of jobs pruned
61
     */
62 1
    public function pruneExceptionJobs($workerName = null, $method = null)
63
    {
64
        /** @var EntityManager $objectManager */
65 1
        $objectManager = $this->getObjectManager();
66 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
67 1
        $queryBuilder->where('j.status = :status')
68 1
            ->setParameter('status', BaseJob::STATUS_EXCEPTION);
69
70 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
71 1
        $query = $queryBuilder->getQuery();
72
73 1
        return intval($query->execute());
74
    }
75
76
    /**
77
     * @param string $workerName
78
     * @param string $method
79
     */
80 17
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
81
    {
82 17
        if (null !== $workerName) {
83 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter('workerName', $workerName);
84
        }
85
86 17
        if (null !== $method) {
87 4
            $queryBuilder->andWhere('j.method = :method')->setParameter('method', $method);
88
        }
89 17
    }
90
91 1
    protected function updateExpired($workerName = null, $method = null)
92
    {
93
        /** @var EntityManager $objectManager */
94 1
        $objectManager = $this->getObjectManager();
95 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
96 1
        $queryBuilder->set('j.status', ':newStatus');
97 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
98 1
            ->setParameter('expiresAt', Util::getMicrotimeDateTime());
99 1
        $queryBuilder->andWhere('j.status = :status')
100 1
            ->setParameter('status', BaseJob::STATUS_NEW)
101 1
            ->setParameter('newStatus', Job::STATUS_EXPIRED);
102
103 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
104 1
        $query = $queryBuilder->getQuery();
105
106 1
        return intval($query->execute());
107
    }
108
109
    /**
110
     * Removes archived jobs older than $olderThan.
111
     */
112 1
    public function pruneArchivedJobs(\DateTime $olderThan)
113
    {
114 1
        return $this->removeOlderThan(
115 1
            $this->getJobArchiveClass(),
116 1
            'updatedAt',
117
            $olderThan
118
        );
119
    }
120
121 2
    public function getWaitingJobCount($workerName = null, $method = null)
122
    {
123
        /** @var EntityManager $objectManager */
124 2
        $objectManager = $this->getObjectManager();
125 2
        $queryBuilder = $objectManager->createQueryBuilder();
126
127 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
128
129 2
        $this->addStandardPredicates($queryBuilder);
130 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
131
132 2
        $query = $queryBuilder->getQuery();
133
134 2
        return $query->getSingleScalarResult();
135
    }
136
137
    /**
138
     * @param string $workerName
139
     * @param string $methodName
140
     */
141 2
    public function countLiveJobs($workerName = null, $methodName = null)
142
    {
143
        /** @var EntityRepository $repository */
144 2
        $repository = $this->getRepository();
145 2
        $queryBuilder = $repository->createQueryBuilder('j');
146 2
        $this->addStandardPredicates($queryBuilder);
147 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
148 2
        $queryBuilder->select('count(j.id)');
149
150 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
151
    }
152
153
    /**
154
     * Get Jobs statuses.
155
     */
156 3
    public function getStatus()
157
    {
158 3
        $result = [];
159 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
160 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
161
162 3
        $finalResult = [];
163 3
        foreach ($result as $key => $item) {
164 1
            ksort($item);
165 1
            foreach ($item as $status => $count) {
166 1
                if (isset($finalResult[$key][$status])) {
167
                    $finalResult[$key][$status] += $count;
168
                } else {
169 1
                    $finalResult[$key][$status] = $count;
170
                }
171
            }
172
        }
173
174 3
        return $finalResult;
175
    }
176
177
    /**
178
     * @param string $entityName
179
     */
180 3
    protected function getStatusByEntityName($entityName, array &$result)
181
    {
182
        /** @var EntityManager $objectManager */
183 3
        $objectManager = $this->getObjectManager();
184 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
185 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
186
187 3
        foreach ($result1 as $item) {
188 1
            $method = $item['workerName'].'->'.$item['method'].'()';
189 1
            if (!isset($result[$method])) {
190 1
                $result[$method] = static::getAllStatuses();
191
            }
192 1
            $result[$method][$item['status']] += intval($item['c']);
193
        }
194 3
    }
195
196
    /**
197
     * Get the next job to run (can be filtered by workername and method name).
198
     *
199
     * @param string $workerName
200
     * @param string $methodName
201
     * @param bool   $prioritize
202
     * @param int    $runId
203
     *
204
     * @return Job|null
205
     */
206 14
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
207
    {
208
        do {
209 14
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
210 14
            $queryBuilder->select('j.id');
211 14
            $queryBuilder->setMaxResults(100);
212
213
            /** @var QueryBuilder $queryBuilder */
214 14
            $query = $queryBuilder->getQuery();
215 14
            $jobs = $query->getResult();
216 14
            if (!empty($jobs)) {
217 11
                foreach ($jobs as $job) {
218 11
                    if ($job = $this->takeJob($job['id'], $runId)) {
219 11
                        return $job;
220
                    }
221
                }
222
            }
223 9
        } while (!empty($jobs));
224
225 9
        return null;
226
    }
227
228
    /**
229
     * @param string|null $workerName
230
     * @param string|null $methodName
231
     * @param bool        $prioritize
232
     *
233
     * @return QueryBuilder
234
     */
235 14
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
236
    {
237
        /** @var EntityRepository $repository */
238 14
        $repository = $this->getRepository();
239 14
        $queryBuilder = $repository->createQueryBuilder('j');
240 14
        $this->addStandardPredicates($queryBuilder);
241 14
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
242
243 14
        if ($prioritize) {
244 14
            $queryBuilder->addOrderBy('j.priority', 'DESC');
245 14
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
246
        } else {
247 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
248
        }
249
250 14
        return $queryBuilder;
251
    }
252
253 17
    protected function addStandardPredicates(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
254
    {
255 17
        $dateTime = Util::getMicrotimeDateTime();
256 17
        $microtimeInteger = Util::getMicrotimeIntegerFormat($dateTime);
257
258
        $queryBuilder
259 17
            ->andWhere('j.status = :status')->setParameter('status', $status)
260 17
            ->andWhere($queryBuilder->expr()->orX(
261 17
                $queryBuilder->expr()->isNull('j.whenUs'),
262 17
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
263
            ))
264 17
            ->andWhere($queryBuilder->expr()->orX(
265 17
                $queryBuilder->expr()->isNull('j.expiresAt'),
266 17
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
267
            ))
268 17
            ->setParameter('whenUs', $microtimeInteger)
269 17
            ->setParameter('expiresAt', $dateTime);
270 17
    }
271
272
    /**
273
     * @param int $runId
274
     */
275 11
    protected function takeJob($jobId, $runId = null)
276
    {
277
        /** @var EntityRepository $repository */
278 11
        $repository = $this->getRepository();
279
        /** @var QueryBuilder $queryBuilder */
280 11
        $queryBuilder = $repository->createQueryBuilder('j');
281
        $queryBuilder
282 11
            ->update()
283 11
            ->set('j.status', ':status')
284 11
            ->setParameter('status', BaseJob::STATUS_RUNNING);
285 11
        if (null !== $runId) {
286
            $queryBuilder
287 1
                ->set('j.runId', ':runId')
288 1
                ->setParameter('runId', $runId);
289
        }
290 11
        $queryBuilder->set('j.startedAt', ':startedAt')
291 11
            ->setParameter('startedAt', Util::getMicrotimeDateTime());
292 11
        $queryBuilder->where('j.id = :id');
293 11
        $queryBuilder->setParameter('id', $jobId);
294 11
        $queryBuilder->andWhere('j.status = :statusNew')->setParameter('statusNew', BaseJob::STATUS_NEW);
295 11
        $resultCount = $queryBuilder->getQuery()->execute();
296
297 11
        if (1 === $resultCount) {
298 11
            return $this->findRefresh($jobId);
299
        }
300
301
        return null;
302
    }
303
304 11
    protected function findRefresh($id)
305
    {
306
        /** @var EntityManager $entityManager */
307 11
        $entityManager = $this->getObjectManager();
308 11
        if (($job = $entityManager->getUnitOfWork()->tryGetById(['id' => $id], $this->getJobClass())) instanceof Job) {
309 11
            $entityManager->refresh($job);
310
311 11
            return $job;
312
        }
313
314
        return $this->getRepository()->find($id);
315
    }
316
317
    /**
318
     * Tries to update the nearest job as a batch.
319
     *
320
     * @return Job|null
321
     */
322 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
323
    {
324 1
        if (!$job instanceof Job) {
325
            throw new UnsupportedException('$job must be instance of '.Job::class);
326
        }
327
328
        /** @var QueryBuilder $queryBuilder */
329 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
330 1
        $queryBuilder->select()
331 1
            ->where('j.crcHash = :crcHash')
332 1
            ->andWhere('j.status = :status')
333 1
            ->setParameter('status', BaseJob::STATUS_NEW)
334 1
            ->setParameter('crcHash', $job->getCrcHash())
335 1
            ->orderBy('j.whenUs', 'ASC')
336 1
            ->setMaxResults(1);
337 1
        $existingJobs = $queryBuilder->getQuery()->execute();
338
339 1
        if (empty($existingJobs)) {
340
            return null;
341
        }
342
343
        /** @var Job $existingJob */
344 1
        $existingJob = $existingJobs[0];
345
346 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
347 1
        $newWhenUs = $existingJob->getWhenUs();
348 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
349 1
        if ($bcResult < 0) {
350 1
            $newWhenUs = $job->getWhenUs();
351
        }
352
353 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
354
355 1
        return $existingJob;
356
    }
357
358
    /**
359
     * @param int    $newPriority
360
     * @param string $newWhenUs
361
     */
362 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
363
    {
364 1
        $existingPriority = $existingJob->getPriority();
365 1
        $existingWhenUs = $existingJob->getWhenUs();
366
367 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
368
            /** @var EntityRepository $repository */
369 1
            $repository = $this->getRepository();
370
            /** @var QueryBuilder $queryBuilder */
371 1
            $queryBuilder = $repository->createQueryBuilder('j');
372 1
            $queryBuilder->update();
373 1
            if ($newPriority !== $existingPriority) {
374 1
                $existingJob->setPriority($newPriority);
375 1
                $queryBuilder->set('j.priority', ':priority')
376 1
                    ->setParameter('priority', $newPriority);
377
            }
378 1
            if ($newWhenUs !== $existingWhenUs) {
379 1
                $existingJob->setWhenUs($newWhenUs);
380 1
                $queryBuilder->set('j.whenUs', ':whenUs')
381 1
                    ->setParameter('whenUs', $newWhenUs);
382
            }
383 1
            $queryBuilder->where('j.id = :id');
384 1
            $queryBuilder->setParameter('id', $existingJob->getId());
385 1
            $queryBuilder->getQuery()->execute();
386
        }
387
388 1
        return $existingJob;
389
    }
390
391 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
392
    {
393
        /** @var EntityRepository $repository */
394 2
        $repository = $this->getRepository();
395 2
        $queryBuilder = $repository->createQueryBuilder('j');
396 2
        $this->addStandardPredicates($queryBuilder, $status);
397
        $queryBuilder
398 2
            ->select('DISTINCT j.workerName, j.method');
399
400 2
        $results = $queryBuilder->getQuery()->getArrayResult();
401 2
        if (empty($results)) {
402 2
            return [];
403
        }
404
        $workerMethods = [];
405
        foreach ($results as $result) {
406
            $workerMethods[$result['workerName']][] = $result['method'];
407
        }
408
409
        return $workerMethods;
410
    }
411
412
    /**
413
     * @param string $workerName
414
     * @param string $methodName
415
     */
416 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
417
    {
418
        // First mark all Live non-running jobs as Archive
419 1
        $repository = $this->getRepository();
420
        /** @var QueryBuilder $queryBuilder */
421 1
        $queryBuilder = $repository->createQueryBuilder('j');
422 1
        $queryBuilder->update($this->getJobClass(), 'j')
423 1
            ->set('j.status', ':statusArchive')
424 1
            ->setParameter('statusArchive', Job::STATUS_ARCHIVE);
425 1
        $this->addStandardPredicates($queryBuilder);
426 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
427 1
        $resultCount = $queryBuilder->getQuery()->execute();
428
429 1
        if ($resultCount) {
430 1
            $this->runArchive($progressCallback);
431
        }
432 1
    }
433
434
    /**
435
     * Move jobs in 'archive' status to the archive table.
436
     *
437
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
438
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
439
     */
440 1
    protected function runArchive(callable $progressCallback = null)
441
    {
442
        /** @var EntityManager $entityManager */
443 1
        $entityManager = $this->getObjectManager();
444 1
        $count = 0;
445
        do {
446
            /** @var EntityRepository $repository */
447 1
            $repository = $this->getRepository();
448 1
            $queryBuilder = $repository->createQueryBuilder('j');
449 1
            $queryBuilder->where('j.status = :status')
450 1
                ->setParameter('status', Job::STATUS_ARCHIVE)
451 1
                ->setMaxResults(10000);
452
453 1
            $results = $queryBuilder->getQuery()->getArrayResult();
454 1
            foreach ($results as $jobRow) {
455 1
                $job = $repository->find($jobRow['id']);
456 1
                if ($job) {
457 1
                    $entityManager->remove($job);
458
                }
459 1
                ++$count;
460 1
                if (0 == $count % 10) {
461
                    $this->flush();
462
                    $this->updateProgress($progressCallback, $count);
463
                }
464
            }
465 1
            $this->flush();
466 1
            $this->updateProgress($progressCallback, $count);
467 1
        } while (!empty($results) && 10000 == count($results));
468 1
    }
469
}
470