Completed
Pull Request — master (#40)
by Matthew
07:31
created

JobManager   B

Complexity

Total Complexity 54

Size/Duplication

Total Lines 470
Duplicated Lines 3.19 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 94.49%

Importance

Changes 0
Metric Value
wmc 54
lcom 1
cbo 11
dl 15
loc 470
ccs 223
cts 236
cp 0.9449
rs 7.0642
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
B countJobsByStatus() 0 30 4
B resetSaveOk() 0 23 4
A addWorkerNameCriterion() 0 10 3
A updateExpired() 0 17 1
A pruneExceptionJobs() 0 13 1
A pruneArchivedJobs() 0 8 1
A getWaitingJobCount() 15 15 1
A countLiveJobs() 0 11 1
A getStatus() 0 20 4
A getStatusByEntityName() 0 15 3
B getJob() 0 21 5
A getJobQueryBuilder() 0 17 2
A addStandardPredicate() 0 18 1
B takeJob() 0 27 3
B updateNearestBatch() 0 35 4
B updateBatchJob() 0 28 5
A getWorkersAndMethods() 0 20 3
A archiveAllJobs() 0 17 2
B runArchive() 0 29 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\ORM;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\EntityRepository;
7
use Doctrine\ORM\QueryBuilder;
8
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
9
use Dtc\QueueBundle\Entity\Job;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Util\Util;
13
use Symfony\Component\Process\Exception\LogicException;
14
15
class JobManager extends DoctrineJobManager
16
{
17
    use CommonTrait;
18
    protected static $saveInsertCalled = null;
19
    protected static $resetInsertCalled = null;
20
21 3
    public function countJobsByStatus($objectName, $status, $workerName = null, $method = null)
22
    {
23
        /** @var EntityManager $objectManager */
24 3
        $objectManager = $this->getObjectManager();
25
26
        $queryBuilder = $objectManager
27 3
            ->createQueryBuilder()
28 3
            ->select('count(a.id)')
29 3
            ->from($objectName, 'a')
30 3
            ->where('a.status = :status');
31
32 3
        if (null !== $workerName) {
33 1
            $queryBuilder->andWhere('a.workerName = :workerName')
34 1
                ->setParameter(':workerName', $workerName);
35
        }
36
37 3
        if (null !== $method) {
38 1
            $queryBuilder->andWhere('a.method = :method')
39 1
                ->setParameter(':method', $workerName);
40
        }
41
42 3
        $count = $queryBuilder->setParameter(':status', $status)
43 3
            ->getQuery()->getSingleScalarResult();
44
45 3
        if (!$count) {
46 1
            return 0;
47
        }
48
49 3
        return $count;
50
    }
51
52
    /**
53
     * @param string|null $workerName
54
     * @param string|null $method
55
     *
56
     * @return int Count of jobs pruned
57
     */
58 1
    public function pruneExceptionJobs($workerName = null, $method = null)
59
    {
60
        /** @var EntityManager $objectManager */
61 1
        $objectManager = $this->getObjectManager();
62 1
        $queryBuilder = $objectManager->createQueryBuilder()->delete($this->getJobArchiveClass(), 'j');
63 1
        $queryBuilder->where('j.status = :status')
64 1
            ->setParameter(':status', BaseJob::STATUS_EXCEPTION);
65
66 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
67 1
        $query = $queryBuilder->getQuery();
68
69 1
        return intval($query->execute());
70
    }
71
72 21
    protected function resetSaveOk($function)
73
    {
74 21
        $objectManager = $this->getObjectManager();
75 21
        $splObjectHash = spl_object_hash($objectManager);
76
77 21
        if ('save' === $function) {
78
            $compare = static::$resetInsertCalled;
79
        } else {
80 21
            $compare = static::$saveInsertCalled;
81
        }
82
83 21
        if ($splObjectHash === $compare) {
84
            // Insert SQL is cached...
85
            $msg = "Can't call save and reset within the same process cycle (or using the same EntityManager)";
86
            throw new LogicException($msg);
87
        }
88
89 21
        if ('save' === $function) {
90
            static::$saveInsertCalled = spl_object_hash($objectManager);
91
        } else {
92 21
            static::$resetInsertCalled = spl_object_hash($objectManager);
93
        }
94 21
    }
95
96
    /**
97
     * @param string $workerName
98
     * @param string $method
99
     */
100 15
    protected function addWorkerNameCriterion(QueryBuilder $queryBuilder, $workerName = null, $method = null)
101
    {
102 15
        if (null !== $workerName) {
103 5
            $queryBuilder->andWhere('j.workerName = :workerName')->setParameter(':workerName', $workerName);
104
        }
105
106 15
        if (null !== $method) {
107 4
            $queryBuilder->andWhere('j.method = :method')->setParameter(':method', $method);
108
        }
109 15
    }
110
111 1
    protected function updateExpired($workerName = null, $method = null)
112
    {
113
        /** @var EntityManager $objectManager */
114 1
        $objectManager = $this->getObjectManager();
115 1
        $queryBuilder = $objectManager->createQueryBuilder()->update($this->getJobClass(), 'j');
116 1
        $queryBuilder->set('j.status', ':newStatus');
117 1
        $queryBuilder->where('j.expiresAt <= :expiresAt')
118 1
            ->setParameter(':expiresAt', Util::getMicrotimeDateTime());
119 1
        $queryBuilder->andWhere('j.status = :status')
120 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
121 1
            ->setParameter(':newStatus', Job::STATUS_EXPIRED);
122
123 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
124 1
        $query = $queryBuilder->getQuery();
125
126 1
        return intval($query->execute());
127
    }
128
129
    /**
130
     * Removes archived jobs older than $olderThan.
131
     *
132
     * @param \DateTime $olderThan
133
     */
134 1
    public function pruneArchivedJobs(\DateTime $olderThan)
135
    {
136 1
        return $this->removeOlderThan(
137 1
            $this->getJobArchiveClass(),
138 1
                'updatedAt',
139 1
                $olderThan
140
        );
141
    }
142
143 2 View Code Duplication
    public function getWaitingJobCount($workerName = null, $method = null)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
144
    {
145
        /** @var EntityManager $objectManager */
146 2
        $objectManager = $this->getObjectManager();
147 2
        $queryBuilder = $objectManager->createQueryBuilder();
148
149 2
        $queryBuilder = $queryBuilder->select('count(j)')->from($this->getJobClass(), 'j');
150
151 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $method);
152 2
        $this->addStandardPredicate($queryBuilder);
153
154 2
        $query = $queryBuilder->getQuery();
155
156 2
        return $query->getSingleScalarResult();
157
    }
158
159
    /**
160
     * Get Jobs statuses.
161
     */
162 3
    public function getStatus()
163
    {
164 3
        $result = [];
165 3
        $this->getStatusByEntityName($this->getJobClass(), $result);
166 3
        $this->getStatusByEntityName($this->getJobArchiveClass(), $result);
167
168 3
        $finalResult = [];
169 3
        foreach ($result as $key => $item) {
170 1
            ksort($item);
171 1
            foreach ($item as $status => $count) {
172 1
                if (isset($finalResult[$key][$status])) {
173
                    $finalResult[$key][$status] += $count;
174
                } else {
175 1
                    $finalResult[$key][$status] = $count;
176
                }
177
            }
178
        }
179
180 3
        return $finalResult;
181
    }
182
183
    /**
184
     * @param string $entityName
185
     */
186 3
    protected function getStatusByEntityName($entityName, array &$result)
187
    {
188
        /** @var EntityManager $objectManager */
189 3
        $objectManager = $this->getObjectManager();
190 3
        $result1 = $objectManager->getRepository($entityName)->createQueryBuilder('j')->select('j.workerName, j.method, j.status, count(j) as c')
191 3
            ->groupBy('j.workerName, j.method, j.status')->getQuery()->getArrayResult();
192
193 3
        foreach ($result1 as $item) {
194 1
            $method = $item['workerName'].'->'.$item['method'].'()';
195 1
            if (!isset($result[$method])) {
196 1
                $result[$method] = static::getAllStatuses();
197
            }
198 1
            $result[$method][$item['status']] += intval($item['c']);
199
        }
200 3
    }
201
202
    /**
203
     * Get the next job to run (can be filtered by workername and method name).
204
     *
205
     * @param string $workerName
206
     * @param string $methodName
207
     * @param bool   $prioritize
208
     * @param int    $runId
209
     *
210
     * @return Job|null
211
     */
212 12
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
213
    {
214
        do {
215 12
            $queryBuilder = $this->getJobQueryBuilder($workerName, $methodName, $prioritize);
216 12
            $queryBuilder->select('j.id');
217 12
            $queryBuilder->setMaxResults(100);
218
219
            /** @var QueryBuilder $queryBuilder */
220 12
            $query = $queryBuilder->getQuery();
221 12
            $jobs = $query->getResult();
222 12
            if ($jobs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
223 9
                foreach ($jobs as $job) {
224 9
                    if ($job = $this->takeJob($job['id'], $runId)) {
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $job is correct as $this->takeJob($job['id'], $runId) (which targets Dtc\QueueBundle\ORM\JobManager::takeJob()) seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
225 9
                        return $job;
226
                    }
227
                }
228
            }
229 7
        } while ($jobs);
0 ignored issues
show
Bug Best Practice introduced by
The expression $jobs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
230
231 7
        return null;
232
    }
233
234
    /**
235
     * @param string|null $workerName
236
     * @param string|null $methodName
237
     * @param bool        $prioritize
238
     *
239
     * @return QueryBuilder
240
     */
241 12
    public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true)
242
    {
243
        /** @var EntityRepository $repository */
244 12
        $repository = $this->getRepository();
245 12
        $queryBuilder = $repository->createQueryBuilder('j');
246 12
        $this->addStandardPredicate($queryBuilder);
247 12
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
248
249 12
        if ($prioritize) {
250 12
            $queryBuilder->addOrderBy('j.priority', 'DESC');
251 12
            $queryBuilder->addOrderBy('j.whenUs', 'ASC');
252
        } else {
253 1
            $queryBuilder->orderBy('j.whenUs', 'ASC');
254
        }
255
256 12
        return $queryBuilder;
257
    }
258
259 15
    protected function addStandardPredicate(QueryBuilder $queryBuilder, $status = BaseJob::STATUS_NEW)
260
    {
261 15
        $dateTime = Util::getMicrotimeDateTime();
262 15
        $decimal = Util::getMicrotimeDecimalFormat($dateTime);
263
264
        $queryBuilder
265 15
            ->where('j.status = :status')->setParameter(':status', $status)
266 15
            ->andWhere($queryBuilder->expr()->orX(
267 15
                $queryBuilder->expr()->isNull('j.whenUs'),
268 15
                $queryBuilder->expr()->lte('j.whenUs', ':whenUs')
269
            ))
270 15
            ->andWhere($queryBuilder->expr()->orX(
271 15
                $queryBuilder->expr()->isNull('j.expiresAt'),
272 15
                $queryBuilder->expr()->gt('j.expiresAt', ':expiresAt')
273
            ))
274 15
            ->setParameter(':whenUs', $decimal)
275 15
            ->setParameter(':expiresAt', $dateTime);
276 15
    }
277
278
    /**
279
     * @param int $runId
280
     */
281 9
    protected function takeJob($jobId, $runId = null)
282
    {
283
        /** @var EntityRepository $repository */
284 9
        $repository = $this->getRepository();
285
        /** @var QueryBuilder $queryBuilder */
286 9
        $queryBuilder = $repository->createQueryBuilder('j');
287
        $queryBuilder
288 9
            ->update()
289 9
            ->set('j.status', ':status')
290 9
            ->setParameter(':status', BaseJob::STATUS_RUNNING);
291 9
        if (null !== $runId) {
292
            $queryBuilder
293 1
                ->set('j.runId', ':runId')
294 1
                ->setParameter(':runId', $runId);
295
        }
296 9
        $queryBuilder->set('j.startedAt', ':startedAt')
297 9
            ->setParameter(':startedAt', Util::getMicrotimeDateTime());
298 9
        $queryBuilder->where('j.id = :id');
299 9
        $queryBuilder->setParameter(':id', $jobId);
300 9
        $resultCount = $queryBuilder->getQuery()->execute();
301
302 9
        if (1 === $resultCount) {
303 9
            return $repository->find($jobId);
304
        }
305
306
        return null;
307
    }
308
309
    /**
310
     * Tries to update the nearest job as a batch.
311
     *
312
     * @param \Dtc\QueueBundle\Model\Job $job
313
     *
314
     * @return null|Job
315
     */
316 1
    public function updateNearestBatch(\Dtc\QueueBundle\Model\Job $job)
317
    {
318 1
        if (!$job instanceof Job) {
319
            throw new UnsupportedException('$job must be instance of '.Job::class);
320
        }
321
322
        /** @var QueryBuilder $queryBuilder */
323 1
        $queryBuilder = $this->getRepository()->createQueryBuilder('j');
324 1
        $queryBuilder->select()
325 1
            ->where('j.crcHash = :crcHash')
326 1
            ->andWhere('j.status = :status')
327 1
            ->setParameter(':status', BaseJob::STATUS_NEW)
328 1
            ->setParameter(':crcHash', $job->getCrcHash())
329 1
            ->orderBy('j.whenUs', 'ASC')
330 1
            ->setMaxResults(1);
331 1
        $existingJobs = $queryBuilder->getQuery()->execute();
332
333 1
        if (empty($existingJobs)) {
334
            return null;
335
        }
336
337
        /** @var Job $existingJob */
338 1
        $existingJob = $existingJobs[0];
339
340 1
        $newPriority = max($job->getPriority(), $existingJob->getPriority());
341 1
        $newWhenUs = $existingJob->getWhenUs();
342 1
        $bcResult = bccomp($job->getWhenUs(), $existingJob->getWhenUs());
343 1
        if ($bcResult < 0) {
344 1
            $newWhenUs = $job->getWhenUs();
345
        }
346
347 1
        $this->updateBatchJob($existingJob, $newPriority, $newWhenUs);
348
349 1
        return $existingJob;
350
    }
351
352
    /**
353
     * @param int    $newPriority
354
     * @param string $newWhenUs
355
     */
356 1
    protected function updateBatchJob(Job $existingJob, $newPriority, $newWhenUs)
357
    {
358 1
        $existingPriority = $existingJob->getPriority();
359 1
        $existingWhenUs = $existingJob->getWhenUs();
360
361 1
        if ($newPriority !== $existingPriority || $newWhenUs !== $existingWhenUs) {
362
            /** @var EntityRepository $repository */
363 1
            $repository = $this->getRepository();
364
            /** @var QueryBuilder $queryBuilder */
365 1
            $queryBuilder = $repository->createQueryBuilder('j');
366 1
            $queryBuilder->update();
367 1
            if ($newPriority !== $existingPriority) {
368 1
                $existingJob->setPriority($newPriority);
369 1
                $queryBuilder->set('j.priority', ':priority')
370 1
                    ->setParameter(':priority', $newPriority);
371
            }
372 1
            if ($newWhenUs !== $existingWhenUs) {
373 1
                $existingJob->setWhenUs($newWhenUs);
374 1
                $queryBuilder->set('j.whenUs', ':whenUs')
375 1
                    ->setParameter(':whenUs', $newWhenUs);
376
            }
377 1
            $queryBuilder->where('j.id = :id');
378 1
            $queryBuilder->setParameter(':id', $existingJob->getId());
379 1
            $queryBuilder->getQuery()->execute();
380
        }
381
382 1
        return $existingJob;
383
    }
384
385 2
    public function getWorkersAndMethods($status = BaseJob::STATUS_NEW)
386
    {
387
        /** @var EntityRepository $repository */
388 2
        $repository = $this->getRepository();
389 2
        $queryBuilder = $repository->createQueryBuilder('j');
390 2
        $this->addStandardPredicate($queryBuilder, $status);
391
        $queryBuilder
392 2
            ->select('DISTINCT j.workerName, j.method');
393
394 2
        $results = $queryBuilder->getQuery()->getArrayResult();
395 2
        if (empty($results)) {
396 2
            return [];
397
        }
398
        $workerMethods = [];
399
        foreach ($results as $result) {
400
            $workerMethods[$result['workerName']][] = $result['method'];
401
        }
402
403
        return $workerMethods;
404
    }
405
406
    /**
407
     * @param string $workerName
408
     * @param string $methodName
409
     */
410 2
    public function countLiveJobs($workerName = null, $methodName = null)
411
    {
412
        /** @var EntityRepository $repository */
413 2
        $repository = $this->getRepository();
414 2
        $queryBuilder = $repository->createQueryBuilder('j');
415 2
        $this->addStandardPredicate($queryBuilder);
416 2
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
417 2
        $queryBuilder->select('count(j.id)');
418
419 2
        return $queryBuilder->getQuery()->getSingleScalarResult();
420
    }
421
422
    /**
423
     * @param string        $workerName
424
     * @param string        $methodName
425
     * @param callable|null $progressCallback
426
     */
427 1
    public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null)
428
    {
429
        // First mark all Live non-running jobs as Archive
430 1
        $repository = $this->getRepository();
431
        /** @var QueryBuilder $queryBuilder */
432 1
        $queryBuilder = $repository->createQueryBuilder('j');
433 1
        $queryBuilder->update($this->getJobClass(), 'j')
434 1
            ->set('j.status', ':statusArchive')
435 1
            ->setParameter(':statusArchive', Job::STATUS_ARCHIVE);
436 1
        $this->addStandardPredicate($queryBuilder);
437 1
        $this->addWorkerNameCriterion($queryBuilder, $workerName, $methodName);
438 1
        $resultCount = $queryBuilder->getQuery()->execute();
439
440 1
        if ($resultCount) {
441 1
            $this->runArchive($workerName, $methodName, $progressCallback);
442
        }
443 1
    }
444
445
    /**
446
     * Move jobs in 'archive' status to the archive table.
447
     *
448
     *  This is a bit of a hack to run a lower level query so as to process the INSERT INTO SELECT
449
     *   All on the server as "INSERT INTO SELECT" is not supported natively in Doctrine.
450
     *
451
     * @param string|null   $workerName
452
     * @param string|null   $methodName
453
     * @param callable|null $progressCallback
454
     */
455 1
    protected function runArchive($workerName = null, $methodName = null, callable $progressCallback = null)
0 ignored issues
show
Unused Code introduced by
The parameter $workerName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $methodName is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
456
    {
457
        /** @var EntityManager $entityManager */
458 1
        $entityManager = $this->getObjectManager();
459 1
        $count = 0;
460
        do {
461
            /** @var EntityRepository $repository */
462 1
            $repository = $this->getRepository();
463 1
            $queryBuilder = $repository->createQueryBuilder('j');
464 1
            $queryBuilder->where('j.status = :status')
465 1
                ->setParameter(':status', Job::STATUS_ARCHIVE)
466 1
                ->setMaxResults(10000);
467
468 1
            $results = $queryBuilder->getQuery()->getArrayResult();
469 1
            foreach ($results as $jobRow) {
470 1
                $job = $repository->find($jobRow['id']);
471 1
                if ($job) {
472 1
                    $entityManager->remove($job);
473
                }
474 1
                ++$count;
475 1
                if (0 == $count % 10) {
476
                    $this->flush();
477 1
                    $this->updateProgress($progressCallback, $count);
478
                }
479
            }
480 1
            $this->flush();
481 1
            $this->updateProgress($progressCallback, $count);
482 1
        } while (!empty($results) && 10000 == count($results));
483 1
    }
484
}
485