Completed
Pull Request — master (#53)
by Matthew
12:05
created

DoctrineJobManager   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 387
Duplicated Lines 3.88 %

Coupling/Cohesion

Components 1
Dependencies 10

Test Coverage

Coverage 98.41%

Importance

Changes 0
Metric Value
wmc 56
lcom 1
cbo 10
dl 15
loc 387
ccs 186
cts 189
cp 0.9841
rs 6.5957
c 0
b 0
f 0

26 Methods

Rating   Name   Duplication   Size   Complexity  
countJobsByStatus() 0 1 ?
A resetExceptionJobs() 0 18 2
updateExpired() 0 1 ?
A getStalledJobs() 0 11 1
A runStalledLoop() 0 14 4
A resetStalledJobs() 0 19 4
A addWorkerNameMethod() 0 9 3
B pruneExpiredJobs() 0 34 6
B findRunningJobs() 0 23 5
A extractStalledLiveRuns() 0 13 4
A extractStalledJobs() 0 14 4
B extractStalledJobsRunArchive() 0 22 6
B pruneStalledJobs() 0 22 4
A stallableSaveHistory() 0 8 2
A stallableSave() 0 20 3
updateNearestBatch() 0 1 ?
stopIdGenerator() 0 1 ?
restoreIdGenerator() 0 1 ?
B resetJobsByCriterion() 0 28 3
A resetSaveOk() 0 3 1
getJobQueryBuilder() 0 1 ?
A resetArchiveJob() 0 19 2
A resetJob() 15 15 2
getWorkersAndMethods() 0 1 ?
countLiveJobs() 0 1 ?
archiveAllJobs() 0 1 ?

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like DoctrineJobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use DoctrineJobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentRepository;
6
use Doctrine\ORM\EntityRepository;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\StallableJob;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
15
abstract class DoctrineJobManager extends BaseDoctrineJobManager
16
{
17
    use ProgressCallbackTrait;
18
19
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
20
    const STALLED_SECONDS = 1800;
21
22
    /**
23
     * @param string $objectName
24
     */
25
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
26
27 2
    public function resetExceptionJobs($workerName = null, $method = null)
28
    {
29 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
30
31 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
32 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
33 2
        $saveCount = $this->getSaveCount($count);
34 2
        $countProcessed = 0;
35 2
        for ($i = 0; $i < $count; $i += $saveCount) {
36 2
            $countProcessed += $this->resetJobsByCriterion(
37 2
                $criterion,
38 2
                $saveCount,
39 2
                $i
40
            );
41
        }
42
43 2
        return $countProcessed;
44
    }
45
46
    /**
47
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
48
     *
49
     * @param null $workerName
50
     * @param null $method
51
     *
52
     * @return mixed
53
     */
54
    abstract protected function updateExpired($workerName = null, $method = null);
55
56 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
57
    {
58 9
        if (null !== $workerName) {
59 4
            $criterion['workerName'] = $workerName;
60
        }
61 9
        if (null !== $method) {
62 4
            $criterion['method'] = $method;
63
        }
64 9
    }
65
66 3
    public function pruneExpiredJobs($workerName = null, $method = null)
67
    {
68 3
        $count = $this->updateExpired($workerName, $method);
69 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
70 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
71 3
        $objectManager = $this->getObjectManager();
72 3
        $repository = $this->getRepository();
73 3
        $finalCount = 0;
74
75 3
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
76 3
        $identifierData = $metadata->getIdentifier();
77 3
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
78
79 3
        $fetchCount = $this->getFetchCount($count);
80 3
        for ($i = 0; $i < $count; $i += $fetchCount) {
81 3
            $expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
82 3
            $innerCount = 0;
83 3
            if (!empty($expiredJobs)) {
84 3
                foreach ($expiredJobs as $expiredJob) {
85
                    /* @var Job $expiredJob */
86 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
87 3
                    $objectManager->remove($expiredJob);
88 3
                    ++$finalCount;
89 3
                    ++$innerCount;
90
                }
91
            }
92 3
            $this->flush();
93 3
            for ($j = 0; $j < $innerCount; ++$j) {
94 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
95
            }
96
        }
97
98 3
        return $finalCount;
99
    }
100
101 4
    protected function getStalledJobs($workerName = null, $method = null)
102
    {
103 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
104
105 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
106 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
107
108 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
109
110 4
        return $this->extractStalledJobs($runningJobs);
111
    }
112
113 4
    protected function findRunningJobs($criterion, $count)
114
    {
115 4
        $repository = $this->getRepository();
116 4
        $runningJobsById = [];
117
118 4
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
119 4
        $identifierData = $metadata->getIdentifier();
120 4
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
121
122 4
        $fetchCount = $this->getFetchCount($count);
123 4
        for ($i = 0; $i < $count; $i += $fetchCount) {
124 4
            $runningJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
125 4
            if (!empty($runningJobs)) {
126 4
                foreach ($runningJobs as $job) {
127
                    /** @var StallableJob $job */
128 4
                    $runId = $job->getRunId();
129 4
                    $runningJobsById[$runId][] = $job;
130
                }
131
            }
132
        }
133
134 4
        return $runningJobsById;
135
    }
136
137
    /**
138
     * @param $runId
139
     * @param array $jobs
140
     * @param array $stalledJobs
141
     */
142 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
143
    {
144 4
        $objectManager = $this->getObjectManager();
145 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
146 4
        if ($run = $runRepository->find($runId)) {
147 2
            foreach ($jobs as $job) {
148 2
                if ($run->getCurrentJobId() == $job->getId()) {
149 2
                    continue;
150
                }
151 2
                $stalledJobs[] = $job;
152
            }
153
        }
154 4
    }
155
156
    /**
157
     * @param array $runningJobsById
158
     *
159
     * @return array
160
     */
161 4
    protected function extractStalledJobs(array $runningJobsById)
162
    {
163 4
        $stalledJobs = [];
164 4
        foreach (array_keys($runningJobsById) as $runId) {
165 4
            if (!$runId && 0 !== $runId) {
166 2
                $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
167 2
                continue;
168
            }
169 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
170 4
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
171
        }
172
173 4
        return $stalledJobs;
174
    }
175
176 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
177
    {
178 4
        $runManager = $this->getRunManager();
179 4
        if (!method_exists($runManager, 'getObjectManager')) {
180
            return;
181
        }
182 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
183
            return;
184
        }
185
186
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
187 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
188
        /** @var Run $run */
189 4
        if ($run = $runArchiveRepository->find($runId)) {
190 4
            if ($endTime = $run->getEndedAt()) {
191
                // Did it end over an hour ago
192 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
193 4
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
194
                }
195
            }
196
        }
197 4
    }
198
199
    /**
200
     * @param int   $i
201
     * @param int   $count
202
     * @param int   $saveCount
203
     * @param array $stalledJobs
204
     *
205
     * @return int
206
     */
207 2
    protected function runStalledLoop($i, $count, $saveCount, array $stalledJobs)
208
    {
209 2
        $resetCount = 0;
210 2
        for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $count; ++$j) {
211
            /* StallableJob $job */
212 2
            $job = $stalledJobs[$j];
213 2
            $job->setStatus(StallableJob::STATUS_STALLED);
214 2
            if ($this->saveHistory($job)) {
215 2
                ++$resetCount;
216
            }
217
        }
218
219 2
        return $resetCount;
220
    }
221
222 2
    public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
223
    {
224 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
225 2
        $stalledJobsCount = count($stalledJobs);
226 2
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
227 2
        $countReset = 0;
228 2
        $saveCount = $this->getSaveCount($stalledJobsCount);
229 2
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
230 2
            $resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs);
231 2
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
232 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
233
            }
234 2
            $countReset += $resetCount;
235 2
            $this->flush();
236 2
            $this->updateProgress($progressCallback, $countReset, $stalledJobsCount);
237
        }
238
239 2
        return $countReset;
240
    }
241
242
    /**
243
     * @param string        $workerName
244
     * @param string        $method
245
     * @param callable|null $progressCallback
246
     */
247 4
    public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
248
    {
249 4
        $stalledJobs = $this->getStalledJobs($workerName, $method);
250 4
        $stalledJobsCount = count($stalledJobs);
251 4
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
252 4
        $countProcessed = 0;
253 4
        $saveCount = $this->getSaveCount($stalledJobsCount);
254 4
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
255 4
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
256
                /** @var StallableJob $job */
257 4
                $job = $stalledJobs[$j];
258 4
                $job->setStatus(StallableJob::STATUS_STALLED);
259 4
                $job->setStalls(intval($job->getStalls()) + 1);
260 4
                $this->deleteJob($job);
261 4
                ++$countProcessed;
262
            }
263 4
            $this->flush();
264 4
            $this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount);
265
        }
266
267 4
        return $countProcessed;
268
    }
269
270 8
    protected function stallableSaveHistory(StallableJob $job, $retry)
271
    {
272 8
        if (!$retry) {
273 8
            $this->deleteJob($job);
274
        }
275
276 8
        return $retry;
277
    }
278
279 43
    protected function stallableSave(StallableJob $job)
280
    {
281
        // Generate crc hash for the job
282 43
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
283 43
        $crcHash = hash('sha256', serialize($hashValues));
284 43
        $job->setCrcHash($crcHash);
285
286 43
        if (true === $job->getBatch()) {
287 2
            $oldJob = $this->updateNearestBatch($job);
288 2
            if ($oldJob) {
289 2
                return $oldJob;
290
            }
291
        }
292
293
        // Just save a new job
294 43
        $this->resetSaveOk(__FUNCTION__);
295 43
        $this->persist($job);
296
297 43
        return $job;
298
    }
299
300
    abstract protected function updateNearestBatch(Job $job);
301
302
    /**
303
     * @param string $objectName
304
     */
305
    abstract protected function stopIdGenerator($objectName);
306
307
    abstract protected function restoreIdGenerator($objectName);
308
309
    /**
310
     * @param array $criterion
311
     * @param int   $limit
312
     * @param int   $offset
313
     */
314 2
    private function resetJobsByCriterion(
315
        array $criterion,
316
        $limit,
317
        $offset
318
    ) {
319 2
        $objectManager = $this->getObjectManager();
320 2
        $this->resetSaveOk(__FUNCTION__);
321 2
        $objectName = $this->getJobClass();
322 2
        $archiveObjectName = $this->getJobArchiveClass();
323 2
        $jobRepository = $objectManager->getRepository($objectName);
324 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
325 2
        $className = $jobRepository->getClassName();
326 2
        $metadata = $objectManager->getClassMetadata($className);
327 2
        $this->stopIdGenerator($objectName);
328 2
        $identifierData = $metadata->getIdentifier();
329 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
330 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
331 2
        $countProcessed = 0;
332
333 2
        foreach ($results as $jobArchive) {
334 2
            $countProcessed += $this->resetArchiveJob($jobArchive);
335
        }
336 2
        $objectManager->flush();
337
338 2
        $this->restoreIdGenerator($objectName);
339
340 2
        return $countProcessed;
341
    }
342
343 22
    protected function resetSaveOk($function)
344
    {
345 22
    }
346
347
    /**
348
     * @param null $workerName
349
     * @param null $methodName
350
     * @param bool $prioritize
351
     */
352
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
353
354
    /**
355
     * @param StallableJob $jobArchive
356
     * @param $className
357
     *
358
     * @return int Number of jobs reset
359
     */
360 2
    protected function resetArchiveJob(StallableJob $jobArchive)
361
    {
362 2
        $objectManager = $this->getObjectManager();
363 2
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
364 2
            $objectManager->persist($jobArchive);
365
366 2
            return 0;
367
        }
368
369
        /** @var StallableJob $job */
370 2
        $className = $this->getJobClass();
371 2
        $newJob = new $className();
372 2
        Util::copy($jobArchive, $newJob);
373 2
        $this->resetJob($newJob);
374 2
        $objectManager->remove($jobArchive);
375 2
        $this->flush();
376
377 2
        return 1;
378
    }
379
380 8 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
381
    {
382 8
        if (!$job instanceof StallableJob) {
383
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
384
        }
385 8
        $job->setStatus(BaseJob::STATUS_NEW);
386 8
        $job->setMessage(null);
387 8
        $job->setFinishedAt(null);
388 8
        $job->setStartedAt(null);
389 8
        $job->setElapsed(null);
390 8
        $job->setRetries($job->getRetries() + 1);
391 8
        $this->persist($job);
392
393 8
        return true;
394
    }
395
396
    abstract public function getWorkersAndMethods();
397
398
    abstract public function countLiveJobs($workerName = null, $methodName = null);
399
400
    abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null);
401
}
402