Completed
Pull Request — master (#27)
by Matthew
20:41 queued 16:25
created

BaseJobManager   D

Complexity

Total Complexity 59

Size/Duplication

Total Lines 422
Duplicated Lines 0 %

Test Coverage

Coverage 99.53%

Importance

Changes 0
Metric Value
wmc 59
dl 0
loc 422
rs 4.5454
c 0
b 0
f 0
ccs 214
cts 215
cp 0.9953

23 Methods

Rating   Name   Duplication   Size   Complexity  
A resetStalledJobs() 0 15 3
B findRunningJobs() 0 18 5
A addWorkerNameMethod() 0 7 3
B resetJob() 0 25 2
B resetJobsByCriterion() 0 27 3
A extractStalledJobs() 0 9 2
A resetErroneousJobs() 0 14 2
A __construct() 0 7 1
A getRepository() 0 3 1
A deleteJob() 0 5 1
B pruneExpiredJobs() 0 27 5
B extractStalledJobsRunArchive() 0 18 6
A flush() 0 3 1
A updateMaxStatus() 0 9 3
A extractStalledLiveRuns() 0 10 4
A getObjectManager() 0 3 1
A prioritySave() 0 21 3
A saveHistory() 0 3 1
A pruneStalledJobs() 0 21 4
A resetSaveOk() 0 2 1
B runStalledLoop() 0 25 5
A getStalledJobs() 0 10 1
A getJobArchiveClass() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like BaseJobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use BaseJobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Model\JobTiming;
12
use Dtc\QueueBundle\Model\JobTimingManager;
13
use Dtc\QueueBundle\Model\PriorityJobManager;
14
use Dtc\QueueBundle\Model\RetryableJob;
15
use Dtc\QueueBundle\Model\Run;
16
use Dtc\QueueBundle\Model\RunManager;
17
use Dtc\QueueBundle\Util\Util;
18
19
abstract class BaseJobManager extends PriorityJobManager
20
{
21
    /** Number of jobs to prune / reset / gather at a time */
22
    const FETCH_COUNT = 100;
23
24
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
25
    const STALLED_SECONDS = 1800;
26
27
    /**
28
     * @var ObjectManager
29
     */
30
    protected $objectManager;
31
    /**
32
     * @var string
33
     */
34
    protected $jobArchiveClass;
35 11
36
    /**
37
     * @param string $objectName
38
     * @param string $archiveObjectName
39
     * @param string $runClass
40
     * @param string $runArchiveClass
41 11
     */
42 11
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, ObjectManager $objectManager,
43 11
        $jobClass,
44 11
        $jobArchiveClass)
45 11
    {
46 11
        $this->objectManager = $objectManager;
47
        $this->jobArchiveClass = $jobArchiveClass;
48
        parent::__construct($runManager, $jobTimingManager, $jobClass);
49
    }
50
51 38
    /**
52
     * @return ObjectManager
53 38
     */
54
    public function getObjectManager()
55
    {
56
        return $this->objectManager;
57
    }
58
59 36
    /**
60
     * @return string
61 36
     */
62
    public function getJobArchiveClass()
63
    {
64
        return $this->jobArchiveClass;
65
    }
66
67 24
    /**
68
     * @return ObjectRepository
69 24
     */
70
    public function getRepository()
71
    {
72
        return $this->getObjectManager()->getRepository($this->getJobClass());
73
    }
74
75 4
    /**
76
     * @param string $objectName
77 4
     */
78
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
79
80
    public function resetErroneousJobs($workerName = null, $method = null)
81
    {
82
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_ERROR, $workerName, $method);
83 5
84
        $criterion = ['status' => Job::STATUS_ERROR];
85 5
        $this->addWorkerNameMethod($criterion, $workerName, $method);
86
87
        $countProcessed = 0;
88
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
89
            $countProcessed += $this->resetJobsByCriterion(
90
                $criterion, static::FETCH_COUNT, $i);
91 28
        }
92
93 28
        return $countProcessed;
94
    }
95
96
    /**
97
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
98
     *
99
     * @param null $workerName
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $method is correct as it would always require null to be passed?
Loading history...
Documentation Bug introduced by
Are you sure the doc-type for parameter $workerName is correct as it would always require null to be passed?
Loading history...
100
     * @param null $method
101 2
     *
102
     * @return mixed
103 2
     */
104
    abstract protected function updateExpired($workerName = null, $method = null);
105 2
106 2
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
107
    {
108 2
        if (null !== $workerName) {
109 2
            $criterion['workerName'] = $workerName;
110 2
        }
111 2
        if (null !== $method) {
112 2
            $criterion['method'] = $method;
113
        }
114 2
    }
115
116
    public function pruneExpiredJobs($workerName = null, $method = null)
117
    {
118
        $count = $this->updateExpired($workerName, $method);
119
        $criterion = ['status' => Job::STATUS_EXPIRED];
120
        $this->addWorkerNameMethod($criterion, $workerName, $method);
121
        $objectManager = $this->getObjectManager();
122
        $repository = $this->getRepository();
123
        $finalCount = 0;
124
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
125
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
126
            $innerCount = 0;
127 9
            if (!empty($expiredJobs)) {
128
                foreach ($expiredJobs as $expiredJob) {
129 9
                    /* @var Job $expiredJob */
130 4
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
131 4
                    $objectManager->remove($expiredJob);
132 9
                    ++$finalCount;
133 4
                    ++$innerCount;
134 4
                }
135 9
            }
136
            $this->flush();
137 3
            for ($j = 0; $j < $innerCount; ++$j) {
138
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
139 3
            }
140 3
        }
141 3
142 3
        return $finalCount;
143 3
    }
144 3
145 3
    protected function flush()
146 3
    {
147 3
        $this->getObjectManager()->flush();
148 3
    }
149
150 3
    protected function getStalledJobs($workerName = null, $method = null)
151 3
    {
152 3
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
153 3
154 3
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
155 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
156 3
157
        $runningJobs = $this->findRunningJobs($criterion, $count);
158 3
159
        return $this->extractStalledJobs($runningJobs);
160
    }
161 7
162
    protected function findRunningJobs($criterion, $count)
163 7
    {
164 7
        $repository = $this->getRepository();
165
        $runningJobsById = [];
166 4
167
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
168 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
169
            if (!empty($runningJobs)) {
170 4
                foreach ($runningJobs as $job) {
171 4
                    /** @var RetryableJob $job */
172
                    if (null !== $runId = $job->getRunId()) {
173 4
                        $runningJobsById[$runId][] = $job;
174
                    }
175 4
                }
176
            }
177
        }
178 4
179
        return $runningJobsById;
180 4
    }
181 4
182
    /**
183 4
     * @param $runId
184 4
     * @param array $jobs
185 4
     * @param array $stalledJobs
186 4
     */
187
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
188 4
    {
189 4
        $objectManager = $this->getObjectManager();
190 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
191 4
        if ($run = $runRepository->find($runId)) {
192 4
            foreach ($jobs as $job) {
193 4
                if ($run->getCurrentJobId() == $job->getId()) {
194
                    continue;
195 4
                }
196
                $stalledJobs[] = $job;
197
            }
198
        }
199
    }
200
201
    /**
202
     * @param array $runningJobsById
203 4
     *
204
     * @return array
205 4
     */
206 4
    protected function extractStalledJobs(array $runningJobsById)
207 4
    {
208 2
        $stalledJobs = [];
209 2
        foreach (array_keys($runningJobsById) as $runId) {
210 2
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
211
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
212 2
        }
213 2
214 2
        return $stalledJobs;
215 4
    }
216
217
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
218
    {
219
        $runManager = $this->getRunManager();
220
        if (!method_exists($runManager, 'getObjectManager')) {
221
            return;
222 4
        }
223
        if (!method_exists($runManager, 'getRunArchiveClass')) {
224 4
            return;
225
        }
226 4
227
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
228 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
229 4
        /** @var Run $run */
230 4
        if ($run = $runArchiveRepository->find($runId)) {
231
            if ($endTime = $run->getEndedAt()) {
232 4
                // Did it end over an hour ago
233 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
234
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
235 4
                }
236 4
            }
237 4
        }
238 4
    }
239 4
240 4
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
241
    {
242 4
        if (null !== $max && $count >= $max) {
243
            $job->setStatus($status);
244
245 6
            return true;
246
        }
247 6
248 4
        return false;
249
    }
250 4
251
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
252
    {
253 6
        $objectManager = $this->getObjectManager();
254
        $newCount = 0;
255
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
256 2
            /* RetryableJob $job */
257
            $job = $stalledJobs[$j];
258 2
            $job->setStalledCount($job->getStalledCount() + 1);
259 2
            if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount()) ||
260
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
261 2
                $objectManager->remove($job);
262 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
263 2
                continue;
264 2
            }
265 2
266 2
            $job->setRetries($job->getRetries() + 1);
267
            $job->setStatus(BaseJob::STATUS_NEW);
268
            $job->setLocked(null);
269 2
            $job->setLockedAt(null);
270 2
            $objectManager->persist($job);
271 2
            ++$newCount;
272 2
            ++$countProcessed;
273 2
        }
274 2
275 2
        return $newCount;
276 2
    }
277
278 2
    public function resetStalledJobs($workerName = null, $method = null)
279
    {
280 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
281
282 2
        $countProcessed = 0;
283 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
284 2
            $newCount = $this->runStalledLoop($i, $count, $stalledJobs, $countProcessed);
285 2
            $this->flush();
286 2
            for ($j = 0; $j < $newCount; ++$j) {
287
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
288 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
289
            }
290
        }
291
292
        return $countProcessed;
293
    }
294
295 2
    /**
296
     * @param string $workerName
297 2
     * @param string $method
298 2
     */
299
    public function pruneStalledJobs($workerName = null, $method = null)
300 2
    {
301 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
302 2
        $objectManager = $this->getObjectManager();
303
304 2
        $countProcessed = 0;
305 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
306 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
307 2
                /** @var RetryableJob $job */
308 2
                $job = $stalledJobs[$j];
309 2
                $job->setStalledCount($job->getStalledCount() + 1);
310 2
                $job->setStatus(BaseJob::STATUS_ERROR);
311 2
                $job->setMessage('stalled');
312 2
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
313 2
                $objectManager->remove($job);
314
                ++$countProcessed;
315 2
            }
316
            $this->flush();
317
        }
318 10
319
        return $countProcessed;
320 10
    }
321 10
322 10
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
323 10
    {
324
        $objectManager = $this->getObjectManager();
325 2
        $objectManager->remove($job);
326
        $objectManager->flush();
327 2
    }
328 2
329
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
330 33
    {
331
        $this->deleteJob($job); // Should cause job to be archived
332
    }
333 33
334 33
    protected function prioritySave(\Dtc\QueueBundle\Model\Job $job)
335 33
    {
336 33
        // Generate crc hash for the job
337
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
338 33
        $crcHash = hash('sha256', serialize($hashValues));
339 2
        $job->setCrcHash($crcHash);
340 2
        $objectManager = $this->getObjectManager();
341 2
342
        if (true === $job->getBatch()) {
343
            $oldJob = $this->updateNearestBatch($job);
344
            if ($oldJob) {
345
                return $oldJob;
346 33
            }
347 33
        }
348 33
349
        // Just save a new job
350 33
        $this->resetSaveOk(__FUNCTION__);
351
        $objectManager->persist($job);
352
        $objectManager->flush();
353
354
        return $job;
355
    }
356
357
    abstract protected function updateNearestBatch(Job $job);
358
359
    /**
360
     * @param string $objectName
361
     */
362
    abstract protected function stopIdGenerator($objectName);
363
364
    abstract protected function restoreIdGenerator($objectName);
365
366
    /**
367 2
     * @param array $criterion
368
     * @param int   $limit
369
     * @param int   $offset
370
     */
371
    private function resetJobsByCriterion(
372 2
        array $criterion,
373 2
        $limit,
374 2
        $offset)
375 2
    {
376 2
        $objectManager = $this->getObjectManager();
377 2
        $this->resetSaveOk(__FUNCTION__);
378 2
        $objectName = $this->getJobClass();
379 2
        $archiveObjectName = $this->getJobArchiveClass();
380 2
        $jobRepository = $objectManager->getRepository($objectName);
381 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
382 2
        $className = $jobRepository->getClassName();
383 2
        $metadata = $objectManager->getClassMetadata($className);
384 2
        $this->stopIdGenerator($objectName);
385
        $identifierData = $metadata->getIdentifier();
386 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
387 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
388 2
        $countProcessed = 0;
389 2
390
        foreach ($results as $jobArchive) {
391 2
            $this->resetJob($jobArchive, $className, $countProcessed);
392
        }
393 2
        $objectManager->flush();
394
395
        $this->restoreIdGenerator($objectName);
396 17
397
        return $countProcessed;
398 17
    }
399
400
    protected function resetSaveOk($function)
0 ignored issues
show
Unused Code introduced by
The parameter $function is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

400
    protected function resetSaveOk(/** @scrutinizer ignore-unused */ $function)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
401
    {
402
    }
403
404
    /**
405
     * @param null $workerName
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $workerName is correct as it would always require null to be passed?
Loading history...
Documentation Bug introduced by
Are you sure the doc-type for parameter $methodName is correct as it would always require null to be passed?
Loading history...
406
     * @param null $methodName
407
     * @param bool $prioritize
408
     */
409
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
410
411
    /**
412 2
     * @param RetryableJob $jobArchive
413
     * @param $className
414 2
     * @param $countProcessed
415 2
     */
416 2
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
417
    {
418 2
        $objectManager = $this->getObjectManager();
419
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
420
            $objectManager->persist($jobArchive);
421
422 2
            return;
423
        }
424 2
425 2
        /** @var RetryableJob $job */
426 2
        $job = new $className();
427 2
428 2
        Util::copy($jobArchive, $job);
429 2
        $job->setStatus(BaseJob::STATUS_NEW);
430 2
        $job->setLocked(null);
431 2
        $job->setLockedAt(null);
432 2
        $job->setMessage(null);
433
        $job->setFinishedAt(null);
434 2
        $job->setStartedAt(null);
435 2
        $job->setElapsed(null);
436 2
        $job->setRetries($job->getRetries() + 1);
437 2
        $objectManager->persist($job);
438
        $objectManager->remove($jobArchive);
439
        $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
440
        ++$countProcessed;
441
    }
442
}
443