DoctrineJobManager::pruneExpiredJobs()   B
last analyzed

Complexity

Conditions 6
Paths 10

Size

Total Lines 33
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 23
nc 10
nop 2
dl 0
loc 33
ccs 24
cts 24
cp 1
crap 6
rs 8.9297
c 0
b 0
f 0
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Model\Job;
7
use Dtc\QueueBundle\Model\JobTiming;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\StallableJob;
10
11
abstract class DoctrineJobManager extends BaseDoctrineJobManager
12
{
13 1
    use ProgressCallbackTrait;
14 1
    use StalledTrait;
15
16
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
17
    public const STALLED_SECONDS = 1800;
18
19
    /**
20
     * @param string $objectName
21
     */
22
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
23
24 2
    public function resetExceptionJobs($workerName = null, $method = null)
25
    {
26 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
27
28 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
29 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
30 2
        $saveCount = $this->getSaveCount($count);
31 2
        $countProcessed = 0;
32 2
        for ($i = 0; $i < $count; $i += $saveCount) {
33 2
            $countProcessed += $this->resetJobsByCriterion(
34 2
                $criterion,
35
                $saveCount,
36
                $i
37
            );
38
        }
39
40 2
        return $countProcessed;
41
    }
42
43
    /**
44
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
45
     *
46
     * @param string|null $workerName
47
     * @param string|null $method
48
     *
49
     * @return mixed
50
     */
51
    abstract protected function updateExpired($workerName = null, $method = null);
52
53 3
    public function pruneExpiredJobs($workerName = null, $method = null)
54
    {
55 3
        $count = $this->updateExpired($workerName, $method);
56 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
57 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
58 3
        $objectManager = $this->getObjectManager();
59 3
        $repository = $this->getRepository();
60 3
        $finalCount = 0;
61
62 3
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
63 3
        $identifierData = $metadata->getIdentifier();
64 3
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
65
66 3
        $fetchCount = $this->getFetchCount($count);
67 3
        for ($i = 0; $i < $count; $i += $fetchCount) {
68 3
            $expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
69 3
            $innerCount = 0;
70 3
            if (!empty($expiredJobs)) {
71 3
                foreach ($expiredJobs as $expiredJob) {
72
                    /* @var Job $expiredJob */
73 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
74 3
                    $objectManager->remove($expiredJob);
75 3
                    ++$finalCount;
76 3
                    ++$innerCount;
77
                }
78
            }
79 3
            $this->flush();
80 3
            for ($j = 0; $j < $innerCount; ++$j) {
81 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
82
            }
83
        }
84
85 3
        return $finalCount;
86
    }
87
88 2
    public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
89
    {
90 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
91 2
        $stalledJobsCount = count($stalledJobs);
92 2
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
93 2
        $countReset = 0;
94 2
        $saveCount = $this->getSaveCount($stalledJobsCount);
95 2
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
96 2
            $resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs);
97 2
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
98 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
99
            }
100 2
            $countReset += $resetCount;
101 2
            $this->flush();
102 2
            $this->updateProgress($progressCallback, $countReset, $stalledJobsCount);
103
        }
104
105 2
        return $countReset;
106
    }
107
108
    /**
109
     * @param string|null $workerName
110
     * @param string|null $method
111
     */
112 4
    public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
113
    {
114 4
        $stalledJobs = $this->getStalledJobs($workerName, $method);
115 4
        $stalledJobsCount = count($stalledJobs);
116 4
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
117 4
        $countProcessed = 0;
118 4
        $saveCount = $this->getSaveCount($stalledJobsCount);
119 4
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
120 4
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
121
                /** @var StallableJob $job */
122 4
                $job = $stalledJobs[$j];
123 4
                $job->setStatus(StallableJob::STATUS_STALLED);
124 4
                $job->setStalls(intval($job->getStalls()) + 1);
125 4
                $this->deleteJob($job);
126 4
                ++$countProcessed;
127
            }
128 4
            $this->flush();
129 4
            $this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount);
130
        }
131
132 4
        return $countProcessed;
133
    }
134
135 12
    protected function stallableSaveHistory(StallableJob $job, $retry)
136
    {
137 12
        if (!$retry) {
138 12
            $this->deleteJob($job);
139
        }
140
141 12
        return $retry;
142
    }
143
144 43
    protected function stallableSave(StallableJob $job)
145
    {
146
        // Generate crc hash for the job
147 43
        $hashValues = [get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs()];
148 43
        $crcHash = hash('sha256', serialize($hashValues));
149 43
        $job->setCrcHash($crcHash);
150
151 43
        if (true === $job->getBatch()) {
152 2
            $oldJob = $this->updateNearestBatch($job);
153 2
            if ($oldJob) {
154 2
                return $oldJob;
155
            }
156
        }
157
158
        // Just save a new job
159 43
        $this->persist($job);
160
161 43
        return $job;
162
    }
163
164
    abstract protected function updateNearestBatch(Job $job);
165
166
    /**
167
     * @param string $objectName
168
     */
169
    abstract protected function stopIdGenerator($objectName);
170
171
    abstract protected function restoreIdGenerator($objectName);
172
173
    /**
174
     * @param string|null $workerName
175
     * @param string|null $methodName
176
     * @param bool        $prioritize
177
     */
178
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
179
180 8
    protected function resetJob(RetryableJob $job)
181
    {
182 8
        if (!$job instanceof StallableJob) {
183
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
184
        }
185 8
        $job->setStatus(BaseJob::STATUS_NEW);
186 8
        $job->setMessage(null);
187 8
        $job->setFinishedAt(null);
188 8
        $job->setStartedAt(null);
189 8
        $job->setElapsed(null);
190 8
        $job->setRetries($job->getRetries() + 1);
191 8
        $this->persist($job);
192
193 8
        return true;
194
    }
195
196
    abstract public function getWorkersAndMethods();
197
198
    abstract public function countLiveJobs($workerName = null, $methodName = null);
199
200
    abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null);
201
}
202