Completed
Pull Request — master (#40)
by Matthew
19:38 queued 15:57
created

DoctrineJobManager::pruneStalledJobs()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 22
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 18
cts 18
cp 1
rs 8.9197
c 0
b 0
f 0
cc 4
eloc 16
nc 3
nop 3
crap 4
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentRepository;
6
use Doctrine\ORM\EntityRepository;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\StallableJob;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
15
abstract class DoctrineJobManager extends BaseDoctrineJobManager
16
{
17
    use ProgressCallbackTrait;
18
19
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
20
    const STALLED_SECONDS = 1800;
21
22
    /**
23
     * @param string $objectName
24
     */
25
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
26
27 2
    public function resetExceptionJobs($workerName = null, $method = null)
28
    {
29 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
30
31 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
32 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
33 2
        $saveCount = $this->getSaveCount($count);
34 2
        $countProcessed = 0;
35 2
        for ($i = 0; $i < $count; $i += $saveCount) {
36 2
            $countProcessed += $this->resetJobsByCriterion(
37 2
                $criterion,
38 2
                $saveCount,
39
                $i
40 2
            );
41 2
        }
42
43 2
        return $countProcessed;
44
    }
45
46
    /**
47
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
48
     *
49
     * @param null $workerName
50
     * @param null $method
51
     *
52
     * @return mixed
53
     */
54
    abstract protected function updateExpired($workerName = null, $method = null);
55
56 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
57
    {
58 9
        if (null !== $workerName) {
59 4
            $criterion['workerName'] = $workerName;
60 4
        }
61 9
        if (null !== $method) {
62 4
            $criterion['method'] = $method;
63 4
        }
64 9
    }
65
66 3
    public function pruneExpiredJobs($workerName = null, $method = null)
67
    {
68 3
        $count = $this->updateExpired($workerName, $method);
69 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
70 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
71 3
        $objectManager = $this->getObjectManager();
72 3
        $repository = $this->getRepository();
73 3
        $finalCount = 0;
74
75 3
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
76 3
        $identifierData = $metadata->getIdentifier();
77 3
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
78
79 3
        $fetchCount = $this->getFetchCount($count);
80 3
        for ($i = 0; $i < $count; $i += $fetchCount) {
81 3
            $expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
82 3
            $innerCount = 0;
83 3
            if (!empty($expiredJobs)) {
84 3
                foreach ($expiredJobs as $expiredJob) {
85
                    /* @var Job $expiredJob */
86 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
87 3
                    $objectManager->remove($expiredJob);
88 3
                    ++$finalCount;
89 3
                    ++$innerCount;
90 3
                }
91 3
            }
92 3
            $this->flush();
93 3
            for ($j = 0; $j < $innerCount; ++$j) {
94 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
95 3
            }
96 3
        }
97
98 3
        return $finalCount;
99
    }
100
101 4
    protected function getStalledJobs($workerName = null, $method = null)
102
    {
103 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
104
105 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
106 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
107
108 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
109
110 4
        return $this->extractStalledJobs($runningJobs);
111
    }
112
113 4
    protected function findRunningJobs($criterion, $count)
114
    {
115 4
        $repository = $this->getRepository();
116 4
        $runningJobsById = [];
117
118 4
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
119 4
        $identifierData = $metadata->getIdentifier();
120 4
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
121
122 4
        $fetchCount = $this->getFetchCount($count);
123 4
        for ($i = 0; $i < $count; $i += $fetchCount) {
124 4
            $runningJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
125 4
            if (!empty($runningJobs)) {
126 4
                foreach ($runningJobs as $job) {
127
                    /** @var StallableJob $job */
128 4
                    $runId = $job->getRunId();
129 4
                    $runningJobsById[$runId][] = $job;
130 4
                }
131 4
            }
132 4
        }
133
134 4
        return $runningJobsById;
135
    }
136
137
    /**
138
     * @param $runId
139
     * @param array $jobs
140
     * @param array $stalledJobs
141
     */
142 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
143
    {
144 4
        $objectManager = $this->getObjectManager();
145 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
146 4
        if ($run = $runRepository->find($runId)) {
147 2
            foreach ($jobs as $job) {
148 2
                if ($run->getCurrentJobId() == $job->getId()) {
149 2
                    continue;
150
                }
151 2
                $stalledJobs[] = $job;
152 2
            }
153 2
        }
154 4
    }
155
156
    /**
157
     * @param array $runningJobsById
158
     *
159
     * @return array
160
     */
161 4
    protected function extractStalledJobs(array $runningJobsById)
162
    {
163 4
        $stalledJobs = [];
164 4
        foreach (array_keys($runningJobsById) as $runId) {
165 4
            if (!$runId && 0 !== $runId) {
166 2
                $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
167 2
                continue;
168
            }
169 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
170 4
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
171 4
        }
172
173 4
        return $stalledJobs;
174
    }
175
176 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
177
    {
178 4
        $runManager = $this->getRunManager();
179 4
        if (!method_exists($runManager, 'getObjectManager')) {
180
            return;
181
        }
182 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
183
            return;
184
        }
185
186
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
187 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
188
        /** @var Run $run */
189 4
        if ($run = $runArchiveRepository->find($runId)) {
190 4
            if ($endTime = $run->getEndedAt()) {
191
                // Did it end over an hour ago
192 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
193 4
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
194 4
                }
195 4
            }
196 4
        }
197 4
    }
198
199
    /**
200
     * @param int   $i
201
     * @param int   $count
202
     * @param int   $saveCount
203
     * @param array $stalledJobs
204
     *
205
     * @return int
206
     */
207 2
    protected function runStalledLoop($i, $count, $saveCount, array $stalledJobs)
208
    {
209 2
        $resetCount = 0;
210 2
        for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $count; ++$j) {
211
            /* StallableJob $job */
212 2
            $job = $stalledJobs[$j];
213 2
            $job->setStatus(StallableJob::STATUS_STALLED);
214 2
            if ($this->saveHistory($job)) {
215 2
                ++$resetCount;
216 2
            }
217 2
        }
218
219 2
        return $resetCount;
220
    }
221
222 2
    public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
223
    {
224 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
225 2
        $stalledJobsCount = count($stalledJobs);
226 2
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
227 2
        $countReset = 0;
228 2
        $saveCount = $this->getSaveCount($stalledJobsCount);
229 2
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
230 2
            $resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs);
231 2
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
232 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
233 2
            }
234 2
            $countReset += $resetCount;
235 2
            $this->flush();
236 2
            $this->updateProgress($progressCallback, $countReset, $stalledJobsCount);
237 2
        }
238
239 2
        return $countReset;
240
    }
241
242
    /**
243
     * @param string        $workerName
244
     * @param string        $method
245
     * @param callable|null $progressCallback
246
     */
247 4
    public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
248
    {
249 4
        $stalledJobs = $this->getStalledJobs($workerName, $method);
250 4
        $stalledJobsCount = count($stalledJobs);
251 4
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
252 4
        $countProcessed = 0;
253 4
        $saveCount = $this->getSaveCount($stalledJobsCount);
254 4
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
255 4
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
256
                /** @var StallableJob $job */
257 4
                $job = $stalledJobs[$j];
258 4
                $job->setStatus(StallableJob::STATUS_STALLED);
259 4
                $job->setStalls(intval($job->getStalls()) + 1);
260 4
                $this->deleteJob($job);
261 4
                ++$countProcessed;
262 4
            }
263 4
            $this->flush();
264 4
            $this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount);
265 4
        }
266
267 4
        return $countProcessed;
268
    }
269
270 8
    protected function stallableSaveHistory(StallableJob $job, $retry)
271
    {
272 8
        if (!$retry) {
273 8
            $this->deleteJob($job);
274 8
        }
275
276 8
        return $retry;
277
    }
278
279 43
    protected function stallableSave(StallableJob $job)
280
    {
281
        // Generate crc hash for the job
282 43
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
283 43
        $crcHash = hash('sha256', serialize($hashValues));
284 43
        $job->setCrcHash($crcHash);
285 43
        $objectManager = $this->getObjectManager();
286
287 43
        if (true === $job->getBatch()) {
288 2
            $oldJob = $this->updateNearestBatch($job);
289 2
            if ($oldJob) {
290 2
                return $oldJob;
291
            }
292
        }
293
294
        // Just save a new job
295 43
        $this->resetSaveOk(__FUNCTION__);
296 43
        $objectManager->persist($job);
297 43
        $objectManager->flush();
298
299 43
        return $job;
300
    }
301
302
    abstract protected function updateNearestBatch(Job $job);
303
304
    /**
305
     * @param string $objectName
306
     */
307
    abstract protected function stopIdGenerator($objectName);
308
309
    abstract protected function restoreIdGenerator($objectName);
310
311
    /**
312
     * @param array $criterion
313
     * @param int   $limit
314
     * @param int   $offset
315
     */
316 2
    private function resetJobsByCriterion(
317
        array $criterion,
318
        $limit,
319
        $offset
320
    ) {
321 2
        $objectManager = $this->getObjectManager();
322 2
        $this->resetSaveOk(__FUNCTION__);
323 2
        $objectName = $this->getJobClass();
324 2
        $archiveObjectName = $this->getJobArchiveClass();
325 2
        $jobRepository = $objectManager->getRepository($objectName);
326 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
327 2
        $className = $jobRepository->getClassName();
328 2
        $metadata = $objectManager->getClassMetadata($className);
329 2
        $this->stopIdGenerator($objectName);
330 2
        $identifierData = $metadata->getIdentifier();
331 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
332 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
333 2
        $countProcessed = 0;
334
335 2
        foreach ($results as $jobArchive) {
336 2
            $countProcessed += $this->resetArchiveJob($jobArchive);
337 2
        }
338 2
        $objectManager->flush();
339
340 2
        $this->restoreIdGenerator($objectName);
341
342 2
        return $countProcessed;
343
    }
344
345 22
    protected function resetSaveOk($function)
346
    {
347 22
    }
348
349
    /**
350
     * @param null $workerName
351
     * @param null $methodName
352
     * @param bool $prioritize
353
     */
354
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
355
356
    /**
357
     * @param StallableJob $jobArchive
358
     * @param $className
359
     *
360
     * @return int Number of jobs reset
361
     */
362 2
    protected function resetArchiveJob(StallableJob $jobArchive)
363
    {
364 2
        $objectManager = $this->getObjectManager();
365 2
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
366 2
            $objectManager->persist($jobArchive);
367
368 2
            return 0;
369
        }
370
371
        /** @var StallableJob $job */
372 2
        $className = $this->getJobClass();
373 2
        $newJob = new $className();
374 2
        Util::copy($jobArchive, $newJob);
375 2
        $this->resetJob($newJob);
376 2
        $objectManager->remove($jobArchive);
377 2
        $this->flush();
378
379 2
        return 1;
380
    }
381
382 8
    protected function resetJob(RetryableJob $job)
383
    {
384 8
        if (!$job instanceof StallableJob) {
385
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
386
        }
387 8
        $job->setStatus(BaseJob::STATUS_NEW);
388 8
        $job->setMessage(null);
389 8
        $job->setFinishedAt(null);
390 8
        $job->setStartedAt(null);
391 8
        $job->setElapsed(null);
392 8
        $job->setRetries($job->getRetries() + 1);
393 8
        $this->getObjectManager()->persist($job);
394 8
        $this->flush();
395
396 8
        return true;
397
    }
398
399
    abstract public function getWorkersAndMethods();
400
401
    abstract public function countLiveJobs($workerName = null, $methodName = null);
402
403
    abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null);
404
}
405