Passed
Push — master ( 82017d...f35517 )
by Matthew
08:19
created

DoctrineJobManager   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 190
Duplicated Lines 0 %

Test Coverage

Coverage 98.88%

Importance

Changes 0
Metric Value
eloc 88
c 0
b 0
f 0
dl 0
loc 190
ccs 88
cts 89
cp 0.9888
rs 10
wmc 23

7 Methods

Rating   Name   Duplication   Size   Complexity  
B pruneExpiredJobs() 0 33 6
A resetStalledJobs() 0 18 4
A resetExceptionJobs() 0 17 2
A pruneStalledJobs() 0 21 4
A resetJob() 0 14 2
A stallableSaveHistory() 0 7 2
A stallableSave() 0 18 3
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Model\Job;
7
use Dtc\QueueBundle\Model\JobTiming;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\StallableJob;
10
11
abstract class DoctrineJobManager extends BaseDoctrineJobManager
12
{
13
    use ProgressCallbackTrait;
14
    use StalledTrait;
15
16
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
17
    const STALLED_SECONDS = 1800;
18
19
    /**
20
     * @param string $objectName
21
     */
22
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
23
24 2
    public function resetExceptionJobs($workerName = null, $method = null)
25
    {
26 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
27
28 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
29 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
30 2
        $saveCount = $this->getSaveCount($count);
31 2
        $countProcessed = 0;
32 2
        for ($i = 0; $i < $count; $i += $saveCount) {
33 2
            $countProcessed += $this->resetJobsByCriterion(
34 2
                $criterion,
35
                $saveCount,
36
                $i
37
            );
38
        }
39
40 2
        return $countProcessed;
41
    }
42
43
    /**
44
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
45
     *
46
     * @param string|null $workerName
47
     * @param string|null $method
48
     *
49
     * @return mixed
50
     */
51
    abstract protected function updateExpired($workerName = null, $method = null);
52
53 3
    public function pruneExpiredJobs($workerName = null, $method = null)
54
    {
55 3
        $count = $this->updateExpired($workerName, $method);
56 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
57 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
58 3
        $objectManager = $this->getObjectManager();
59 3
        $repository = $this->getRepository();
60 3
        $finalCount = 0;
61
62 3
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
63 3
        $identifierData = $metadata->getIdentifier();
64 3
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
65
66 3
        $fetchCount = $this->getFetchCount($count);
67 3
        for ($i = 0; $i < $count; $i += $fetchCount) {
68 3
            $expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
69 3
            $innerCount = 0;
70 3
            if (!empty($expiredJobs)) {
71 3
                foreach ($expiredJobs as $expiredJob) {
72
                    /* @var Job $expiredJob */
0 ignored issues
show
Unused Code Comprehensibility introduced by
43% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
73 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
74 3
                    $objectManager->remove($expiredJob);
75 3
                    ++$finalCount;
76 3
                    ++$innerCount;
77
                }
78
            }
79 3
            $this->flush();
80 3
            for ($j = 0; $j < $innerCount; ++$j) {
81 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
82
            }
83
        }
84
85 3
        return $finalCount;
86
    }
87
88 2
    public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
89
    {
90 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
91 2
        $stalledJobsCount = count($stalledJobs);
92 2
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
93 2
        $countReset = 0;
94 2
        $saveCount = $this->getSaveCount($stalledJobsCount);
95 2
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
96 2
            $resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs);
97 2
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
98 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
99
            }
100 2
            $countReset += $resetCount;
101 2
            $this->flush();
102 2
            $this->updateProgress($progressCallback, $countReset, $stalledJobsCount);
103
        }
104
105 2
        return $countReset;
106
    }
107
108
    /**
109
     * @param string|null $workerName
110
     * @param string|null $method
111
     */
112 4
    public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
113
    {
114 4
        $stalledJobs = $this->getStalledJobs($workerName, $method);
115 4
        $stalledJobsCount = count($stalledJobs);
116 4
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
117 4
        $countProcessed = 0;
118 4
        $saveCount = $this->getSaveCount($stalledJobsCount);
119 4
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
120 4
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
121
                /** @var StallableJob $job */
122 4
                $job = $stalledJobs[$j];
123 4
                $job->setStatus(StallableJob::STATUS_STALLED);
124 4
                $job->setStalls(intval($job->getStalls()) + 1);
125 4
                $this->deleteJob($job);
126 4
                ++$countProcessed;
127
            }
128 4
            $this->flush();
129 4
            $this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount);
130
        }
131
132 4
        return $countProcessed;
133
    }
134
135 10
    protected function stallableSaveHistory(StallableJob $job, $retry)
136
    {
137 10
        if (!$retry) {
138 10
            $this->deleteJob($job);
139
        }
140
141 10
        return $retry;
142
    }
143
144 43
    protected function stallableSave(StallableJob $job)
145
    {
146
        // Generate crc hash for the job
147 43
        $hashValues = [get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs()];
148 43
        $crcHash = hash('sha256', serialize($hashValues));
149 43
        $job->setCrcHash($crcHash);
150
151 43
        if (true === $job->getBatch()) {
152 2
            $oldJob = $this->updateNearestBatch($job);
153 2
            if ($oldJob) {
154 2
                return $oldJob;
155
            }
156
        }
157
158
        // Just save a new job
159 43
        $this->persist($job);
160
161 43
        return $job;
162
    }
163
164
    abstract protected function updateNearestBatch(Job $job);
165
166
    /**
167
     * @param string $objectName
168
     */
169
    abstract protected function stopIdGenerator($objectName);
170
171
    abstract protected function restoreIdGenerator($objectName);
172
173
    /**
174
     * @param string|null $workerName
175
     * @param string|null $methodName
176
     * @param bool        $prioritize
177
     */
178
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
179
180 8
    protected function resetJob(RetryableJob $job)
181
    {
182 8
        if (!$job instanceof StallableJob) {
183
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
184
        }
185 8
        $job->setStatus(BaseJob::STATUS_NEW);
186 8
        $job->setMessage(null);
187 8
        $job->setFinishedAt(null);
188 8
        $job->setStartedAt(null);
189 8
        $job->setElapsed(null);
190 8
        $job->setRetries($job->getRetries() + 1);
191 8
        $this->persist($job);
192
193 8
        return true;
194
    }
195
196
    abstract public function getWorkersAndMethods();
197
198
    abstract public function countLiveJobs($workerName = null, $methodName = null);
199
200
    abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null);
201
}
202