Passed
Push — master ( 35a7ab...d29e83 )
by Matthew
07:21
created

DoctrineJobManager::resetExceptionJobs()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 11
nc 2
nop 2
dl 0
loc 17
ccs 10
cts 10
cp 1
crap 2
rs 9.9
c 0
b 0
f 0
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\Job;
8
use Dtc\QueueBundle\Model\JobTiming;
9
use Dtc\QueueBundle\Model\StallableJob;
10
11
abstract class DoctrineJobManager extends BaseDoctrineJobManager
12
{
13
    use ProgressCallbackTrait;
14
    use StalledTrait;
15
16
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
17
    const STALLED_SECONDS = 1800;
18
19
    /**
20
     * @param string $objectName
21
     */
22
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
23
24 2
    public function resetExceptionJobs($workerName = null, $method = null)
25
    {
26 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
27
28 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
29 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
30 2
        $saveCount = $this->getSaveCount($count);
31 2
        $countProcessed = 0;
32 2
        for ($i = 0; $i < $count; $i += $saveCount) {
33 2
            $countProcessed += $this->resetJobsByCriterion(
34 2
                $criterion,
35
                $saveCount,
36
                $i
37
            );
38
        }
39
40 2
        return $countProcessed;
41
    }
42
43
    /**
44
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
45
     *
46
     * @param string|null $workerName
47
     * @param string|null $method
48
     *
49
     * @return mixed
50
     */
51
    abstract protected function updateExpired($workerName = null, $method = null);
52
53 3
    public function pruneExpiredJobs($workerName = null, $method = null)
54
    {
55 3
        $count = $this->updateExpired($workerName, $method);
56 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
57 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
58 3
        $objectManager = $this->getObjectManager();
59 3
        $repository = $this->getRepository();
60 3
        $finalCount = 0;
61
62 3
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
63 3
        $identifierData = $metadata->getIdentifier();
64 3
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
65
66 3
        $fetchCount = $this->getFetchCount($count);
67 3
        for ($i = 0; $i < $count; $i += $fetchCount) {
68 3
            $expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
69 3
            $innerCount = 0;
70 3
            if (!empty($expiredJobs)) {
71 3
                foreach ($expiredJobs as $expiredJob) {
72
                    /* @var Job $expiredJob */
0 ignored issues
show
Unused Code Comprehensibility introduced by
43% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
73 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
74 3
                    $objectManager->remove($expiredJob);
75 3
                    ++$finalCount;
76 3
                    ++$innerCount;
77
                }
78
            }
79 3
            $this->flush();
80 3
            for ($j = 0; $j < $innerCount; ++$j) {
81 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
82
            }
83
        }
84
85 3
        return $finalCount;
86
    }
87
88 2
    public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
89
    {
90 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
91 2
        $stalledJobsCount = count($stalledJobs);
92 2
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
93 2
        $countReset = 0;
94 2
        $saveCount = $this->getSaveCount($stalledJobsCount);
95 2
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
96 2
            $resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs);
97 2
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
98 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
99
            }
100 2
            $countReset += $resetCount;
101 2
            $this->flush();
102 2
            $this->updateProgress($progressCallback, $countReset, $stalledJobsCount);
103
        }
104
105 2
        return $countReset;
106
    }
107
108
    /**
109
     * @param string|null   $workerName
110
     * @param string|null   $method
111
     * @param callable|null $progressCallback
112
     */
113 4
    public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
114
    {
115 4
        $stalledJobs = $this->getStalledJobs($workerName, $method);
116 4
        $stalledJobsCount = count($stalledJobs);
117 4
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
118 4
        $countProcessed = 0;
119 4
        $saveCount = $this->getSaveCount($stalledJobsCount);
120 4
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
121 4
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
122
                /** @var StallableJob $job */
123 4
                $job = $stalledJobs[$j];
124 4
                $job->setStatus(StallableJob::STATUS_STALLED);
125 4
                $job->setStalls(intval($job->getStalls()) + 1);
126 4
                $this->deleteJob($job);
127 4
                ++$countProcessed;
128
            }
129 4
            $this->flush();
130 4
            $this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount);
131
        }
132
133 4
        return $countProcessed;
134
    }
135
136 10
    protected function stallableSaveHistory(StallableJob $job, $retry)
137
    {
138 10
        if (!$retry) {
139 10
            $this->deleteJob($job);
140
        }
141
142 10
        return $retry;
143
    }
144
145 43
    protected function stallableSave(StallableJob $job)
146
    {
147
        // Generate crc hash for the job
148 43
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
149 43
        $crcHash = hash('sha256', serialize($hashValues));
150 43
        $job->setCrcHash($crcHash);
151
152 43
        if (true === $job->getBatch()) {
153 2
            $oldJob = $this->updateNearestBatch($job);
154 2
            if ($oldJob) {
155 2
                return $oldJob;
156
            }
157
        }
158
159
        // Just save a new job
160 43
        $this->persist($job);
161
162 43
        return $job;
163
    }
164
165
    abstract protected function updateNearestBatch(Job $job);
166
167
    /**
168
     * @param string $objectName
169
     */
170
    abstract protected function stopIdGenerator($objectName);
171
172
    abstract protected function restoreIdGenerator($objectName);
173
174
    /**
175
     * @param string|null $workerName
176
     * @param string|null $methodName
177
     * @param bool        $prioritize
178
     */
179
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
180
181 8
    protected function resetJob(RetryableJob $job)
182
    {
183 8
        if (!$job instanceof StallableJob) {
184
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
185
        }
186 8
        $job->setStatus(BaseJob::STATUS_NEW);
187 8
        $job->setMessage(null);
188 8
        $job->setFinishedAt(null);
189 8
        $job->setStartedAt(null);
190 8
        $job->setElapsed(null);
191 8
        $job->setRetries($job->getRetries() + 1);
192 8
        $this->persist($job);
193
194 8
        return true;
195
    }
196
197
    abstract public function getWorkersAndMethods();
198
199
    abstract public function countLiveJobs($workerName = null, $methodName = null);
200
201
    abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null);
202
}
203