Completed
Pull Request — master (#40)
by Matthew
17:20
created

DoctrineJobManager::getStalledJobs()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 6
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentRepository;
6
use Doctrine\ORM\EntityRepository;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\StallableJob;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
15
abstract class DoctrineJobManager extends BaseDoctrineJobManager
16
{
17
    use ProgressCallbackTrait;
18
19
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
20
    const STALLED_SECONDS = 1800;
21
22
    /**
23
     * @param string $objectName
24
     */
25
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
26
27
    public function resetExceptionJobs($workerName = null, $method = null)
28 2
    {
29
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
30 2
31
        $criterion = ['status' => Job::STATUS_EXCEPTION];
32 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
33 2
        $saveCount = $this->getSaveCount($count);
34
        $countProcessed = 0;
35 2
        for ($i = 0; $i < $count; $i += $saveCount) {
36 2
            $countProcessed += $this->resetJobsByCriterion(
37 2
                $criterion,
38 2
                $saveCount,
39 2
                $i
40
            );
41 2
        }
42 2
43
        return $countProcessed;
44 2
    }
45
46
    /**
47
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
48
     *
49
     * @param null $workerName
50
     * @param null $method
51
     *
52
     * @return mixed
53
     */
54
    abstract protected function updateExpired($workerName = null, $method = null);
55
56
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
57 9
    {
58
        if (null !== $workerName) {
59 9
            $criterion['workerName'] = $workerName;
60 4
        }
61 4
        if (null !== $method) {
62 9
            $criterion['method'] = $method;
63 4
        }
64 4
    }
65 9
66
    public function pruneExpiredJobs($workerName = null, $method = null)
67 3
    {
68
        $count = $this->updateExpired($workerName, $method);
69 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
70 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
71 3
        $objectManager = $this->getObjectManager();
72 3
        $repository = $this->getRepository();
73 3
        $finalCount = 0;
74 3
75 3
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
76 3
        $identifierData = $metadata->getIdentifier();
77 3
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
78 3
79 3
        $fetchCount = $this->getFetchCount($count);
80
        for ($i = 0; $i < $count; $i += $fetchCount) {
81 3
            $expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
82 3
            $innerCount = 0;
83 3
            if (!empty($expiredJobs)) {
84 3
                foreach ($expiredJobs as $expiredJob) {
85 3
                    /* @var Job $expiredJob */
86 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
87 3
                    $objectManager->remove($expiredJob);
88 3
                    ++$finalCount;
89 3
                    ++$innerCount;
90 3
                }
91 3
            }
92
            $this->flush();
93 3
            for ($j = 0; $j < $innerCount; ++$j) {
94
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
95
            }
96 4
        }
97
98 4
        return $finalCount;
99
    }
100 4
101 4
    protected function getStalledJobs($workerName = null, $method = null)
102
    {
103 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
104
105 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
106
        $this->addWorkerNameMethod($criterion, $workerName, $method);
107
108 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
109
110 4
        return $this->extractStalledJobs($runningJobs);
111 4
    }
112
113 4
    protected function findRunningJobs($criterion, $count)
114 4
    {
115 4
        $repository = $this->getRepository();
116 4
        $runningJobsById = [];
117
118 4
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
119 4
        $identifierData = $metadata->getIdentifier();
120 4
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
121 4
122 4
        $fetchCount = $this->getFetchCount($count);
123 4
        for ($i = 0; $i < $count; $i += $fetchCount) {
124
            $runningJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
125 4
            if (!empty($runningJobs)) {
126
                foreach ($runningJobs as $job) {
127
                    /** @var StallableJob $job */
128
                    $runId = $job->getRunId();
129
                    $runningJobsById[$runId][] = $job;
130
                }
131
            }
132
        }
133 4
134
        return $runningJobsById;
135 4
    }
136 4
137 4
    /**
138 2
     * @param $runId
139 2
     * @param array $jobs
140 2
     * @param array $stalledJobs
141
     */
142 2
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
143 2
    {
144 2
        $objectManager = $this->getObjectManager();
145 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
146
        if ($run = $runRepository->find($runId)) {
147
            foreach ($jobs as $job) {
148
                if ($run->getCurrentJobId() == $job->getId()) {
149
                    continue;
150
                }
151
                $stalledJobs[] = $job;
152 4
            }
153
        }
154 4
    }
155 4
156 4
    /**
157 4
     * @param array $runningJobsById
158 4
     *
159
     * @return array
160 4
     */
161
    protected function extractStalledJobs(array $runningJobsById)
162
    {
163 4
        $stalledJobs = [];
164
        foreach (array_keys($runningJobsById) as $runId) {
165 4
            if (!$runId && 0 !== $runId) {
166 4
                $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
167
                continue;
168
            }
169 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
170
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
171
        }
172
173
        return $stalledJobs;
174 4
    }
175
176 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
177 4
    {
178
        $runManager = $this->getRunManager();
179 4
        if (!method_exists($runManager, 'getObjectManager')) {
180 4
            return;
181 4
        }
182 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
183 4
            return;
184 4
        }
185
186 2
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
187
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
188 2
        /** @var Run $run */
189 2
        if ($run = $runArchiveRepository->find($runId)) {
190
            if ($endTime = $run->getEndedAt()) {
191 2
                // Did it end over an hour ago
192 2
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
193 2
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
194 2
                }
195 2
            }
196 2
        }
197
    }
198 2
199
    /**
200
     * @param int   $i
201 2
     * @param int   $count
202
     * @param int   $saveCount
203 2
     * @param array $stalledJobs
204
     *
205 2
     * @return int
206 2
     */
207 2
    protected function runStalledLoop($i, $count, $saveCount, array $stalledJobs)
208 2
    {
209 2
        $resetCount = 0;
210 2
        for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $count; ++$j) {
211 2
            /* StallableJob $job */
212 2
            $job = $stalledJobs[$j];
213 2
            $job->setStatus(StallableJob::STATUS_STALLED);
214
            if ($this->saveHistory($job)) {
215 2
                ++$resetCount;
216
            }
217
        }
218
219
        return $resetCount;
220
    }
221
222 2
    public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
223
    {
224 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
225
        $stalledJobsCount = count($stalledJobs);
226 2
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
227 2
        $countReset = 0;
228 2
        $saveCount = $this->getSaveCount($stalledJobsCount);
229
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
230 2
            $resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs);
231 2
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
232 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
233 2
            }
234 2
            $countReset += $resetCount;
235 2
            $this->flush();
236 2
            $this->updateProgress($progressCallback, $countReset, $stalledJobsCount);
237 2
        }
238
239 2
        return $countReset;
240
    }
241
242 8
    /**
243
     * @param string        $workerName
244 8
     * @param string        $method
245 8
     * @param callable|null $progressCallback
246 8
     */
247
    public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
248 8
    {
249
        $stalledJobs = $this->getStalledJobs($workerName, $method);
250
        $stalledJobsCount = count($stalledJobs);
251 43
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
252
        $countProcessed = 0;
253
        $saveCount = $this->getSaveCount($stalledJobsCount);
254 43
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
255 43
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
256 43
                /** @var StallableJob $job */
257 43
                $job = $stalledJobs[$j];
258
                $job->setStatus(StallableJob::STATUS_STALLED);
259 43
                $job->setStalls(intval($job->getStalls()) + 1);
260 2
                $this->deleteJob($job);
261 2
                ++$countProcessed;
262 2
            }
263
            $this->flush();
264
            $this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount);
265
        }
266
267 43
        return $countProcessed;
268 43
    }
269 43
270
    protected function stallableSaveHistory(StallableJob $job, $retry)
271 43
    {
272
        if (!$retry) {
273
            $this->deleteJob($job);
274
        }
275
276
        return $retry;
277
    }
278
279
    protected function stallableSave(StallableJob $job)
280
    {
281
        // Generate crc hash for the job
282
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
283
        $crcHash = hash('sha256', serialize($hashValues));
284
        $job->setCrcHash($crcHash);
285
        $objectManager = $this->getObjectManager();
286
287
        if (true === $job->getBatch()) {
288 2
            $oldJob = $this->updateNearestBatch($job);
289
            if ($oldJob) {
290
                return $oldJob;
291
            }
292
        }
293 2
294 2
        // Just save a new job
295 2
        $this->resetSaveOk(__FUNCTION__);
296 2
        $objectManager->persist($job);
297 2
        $objectManager->flush();
298 2
299 2
        return $job;
300 2
    }
301 2
302 2
    abstract protected function updateNearestBatch(Job $job);
303 2
304 2
    /**
305 2
     * @param string $objectName
306
     */
307 2
    abstract protected function stopIdGenerator($objectName);
308 2
309 2
    abstract protected function restoreIdGenerator($objectName);
310 2
311
    /**
312 2
     * @param array $criterion
313
     * @param int   $limit
314 2
     * @param int   $offset
315
     */
316
    private function resetJobsByCriterion(
317 22
        array $criterion,
318
        $limit,
319 22
        $offset
320
    ) {
321
        $objectManager = $this->getObjectManager();
322
        $this->resetSaveOk(__FUNCTION__);
323
        $objectName = $this->getJobClass();
324
        $archiveObjectName = $this->getJobArchiveClass();
325
        $jobRepository = $objectManager->getRepository($objectName);
326
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
327
        $className = $jobRepository->getClassName();
328
        $metadata = $objectManager->getClassMetadata($className);
329
        $this->stopIdGenerator($objectName);
330
        $identifierData = $metadata->getIdentifier();
331
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
332
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
333
        $countProcessed = 0;
334 2
335
        foreach ($results as $jobArchive) {
336 2
            $countProcessed += $this->resetArchiveJob($jobArchive);
337 2
        }
338 2
        $objectManager->flush();
339
340 2
        $this->restoreIdGenerator($objectName);
341
342
        return $countProcessed;
343
    }
344 2
345 2
    protected function resetSaveOk($function)
346 2
    {
347 2
    }
348 2
349 2
    /**
350
     * @param null $workerName
351 2
     * @param null $methodName
352
     * @param bool $prioritize
353
     */
354 8
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
355
356 8
    /**
357
     * @param StallableJob $jobArchive
358
     * @param $className
359 8
     *
360 8
     * @return int Number of jobs reset
361 8
     */
362 8
    protected function resetArchiveJob(StallableJob $jobArchive)
363 8
    {
364 8
        $objectManager = $this->getObjectManager();
365 8
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
366 8
            $objectManager->persist($jobArchive);
367
368 8
            return 0;
369
        }
370
371
        /** @var StallableJob $job */
372
        $className = $this->getJobClass();
373
        $newJob = new $className();
374
        Util::copy($jobArchive, $newJob);
375
        $this->resetJob($newJob);
376
        $objectManager->remove($jobArchive);
377
        $this->flush();
378
379
        return 1;
380
    }
381
382
    protected function resetJob(RetryableJob $job)
383
    {
384
        if (!$job instanceof StallableJob) {
385
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
386
        }
387
        $job->setStatus(BaseJob::STATUS_NEW);
388
        $job->setMessage(null);
389
        $job->setFinishedAt(null);
390
        $job->setStartedAt(null);
391
        $job->setElapsed(null);
392
        $job->setRetries($job->getRetries() + 1);
393
        $this->getObjectManager()->persist($job);
394
        $this->flush();
395
396
        return true;
397
    }
398
399
    abstract public function getWorkersAndMethods();
400
401
    abstract public function countLiveJobs($workerName = null, $methodName = null);
402
403
    abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null);
404
}
405