Completed
Pull Request — master (#53)
by Matthew
07:37
created

DoctrineJobManager::extractStalledJobs()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 14
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 9
cts 9
cp 1
rs 9.2
c 0
b 0
f 0
cc 4
eloc 9
nc 3
nop 1
crap 4
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentRepository;
6
use Doctrine\ORM\EntityRepository;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\StallableJob;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
15
abstract class DoctrineJobManager extends BaseDoctrineJobManager
16
{
17
    use ProgressCallbackTrait;
18
19
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
20
    const STALLED_SECONDS = 1800;
21
22
    /**
23
     * @param string $objectName
24
     */
25
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
26
27 2
    public function resetExceptionJobs($workerName = null, $method = null)
28
    {
29 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
30
31 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
32 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
33 2
        $saveCount = $this->getSaveCount($count);
34 2
        $countProcessed = 0;
35 2
        for ($i = 0; $i < $count; $i += $saveCount) {
36 2
            $countProcessed += $this->resetJobsByCriterion(
37 2
                $criterion,
38 2
                $saveCount,
39 2
                $i
40
            );
41
        }
42
43 2
        return $countProcessed;
44
    }
45
46
    /**
47
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
48
     *
49
     * @param null $workerName
50
     * @param null $method
51
     *
52
     * @return mixed
53
     */
54
    abstract protected function updateExpired($workerName = null, $method = null);
55
56 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
57
    {
58 9
        if (null !== $workerName) {
59 4
            $criterion['workerName'] = $workerName;
60
        }
61 9
        if (null !== $method) {
62 4
            $criterion['method'] = $method;
63
        }
64 9
    }
65
66 3
    public function pruneExpiredJobs($workerName = null, $method = null)
67
    {
68 3
        $count = $this->updateExpired($workerName, $method);
69 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
70 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
71 3
        $objectManager = $this->getObjectManager();
72 3
        $repository = $this->getRepository();
73 3
        $finalCount = 0;
74
75 3
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
76 3
        $identifierData = $metadata->getIdentifier();
77 3
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
78
79 3
        $fetchCount = $this->getFetchCount($count);
80 3
        for ($i = 0; $i < $count; $i += $fetchCount) {
81 3
            $expiredJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
82 3
            $innerCount = 0;
83 3
            if (!empty($expiredJobs)) {
84 3
                foreach ($expiredJobs as $expiredJob) {
85
                    /* @var Job $expiredJob */
86 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
87 3
                    $objectManager->remove($expiredJob);
88 3
                    ++$finalCount;
89 3
                    ++$innerCount;
90
                }
91
            }
92 3
            $this->flush();
93 3
            for ($j = 0; $j < $innerCount; ++$j) {
94 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
95
            }
96
        }
97
98 3
        return $finalCount;
99
    }
100
101 4
    protected function getStalledJobs($workerName = null, $method = null)
102
    {
103 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
104
105 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
106 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
107
108 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
109
110 4
        return $this->extractStalledJobs($runningJobs);
111
    }
112
113 4
    protected function findRunningJobs($criterion, $count)
114
    {
115 4
        $repository = $this->getRepository();
116 4
        $runningJobsById = [];
117
118 4
        $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass());
119 4
        $identifierData = $metadata->getIdentifier();
120 4
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
121
122 4
        $fetchCount = $this->getFetchCount($count);
123 4
        for ($i = 0; $i < $count; $i += $fetchCount) {
124 4
            $runningJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i);
125 4
            if (!empty($runningJobs)) {
126 4
                foreach ($runningJobs as $job) {
127
                    /** @var StallableJob $job */
128 4
                    $runId = $job->getRunId();
129 4
                    $runningJobsById[$runId][] = $job;
130
                }
131
            }
132
        }
133
134 4
        return $runningJobsById;
135
    }
136
137
    /**
138
     * @param $runId
139
     * @param array $jobs
140
     * @param array $stalledJobs
141
     */
142 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
143
    {
144 4
        $objectManager = $this->getObjectManager();
145 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
146 4
        if ($run = $runRepository->find($runId)) {
147 2
            foreach ($jobs as $job) {
148 2
                if ($run->getCurrentJobId() == $job->getId()) {
149 2
                    continue;
150
                }
151 2
                $stalledJobs[] = $job;
152
            }
153
        }
154 4
    }
155
156
    /**
157
     * @param array $runningJobsById
158
     *
159
     * @return array
160
     */
161 4
    protected function extractStalledJobs(array $runningJobsById)
162
    {
163 4
        $stalledJobs = [];
164 4
        foreach (array_keys($runningJobsById) as $runId) {
165 4
            if (!$runId && 0 !== $runId) {
166 2
                $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
167 2
                continue;
168
            }
169 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
170 4
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
171
        }
172
173 4
        return $stalledJobs;
174
    }
175
176 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
177
    {
178 4
        $runManager = $this->getRunManager();
179 4
        if (!method_exists($runManager, 'getObjectManager')) {
180
            return;
181
        }
182 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
183
            return;
184
        }
185
186
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
187 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
188
        /** @var Run $run */
189 4
        if ($run = $runArchiveRepository->find($runId)) {
190 4
            if ($endTime = $run->getEndedAt()) {
191
                // Did it end over an hour ago
192 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
193 4
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
194
                }
195
            }
196
        }
197 4
    }
198
199
    /**
200
     * @param int   $i
201
     * @param int   $count
202
     * @param int   $saveCount
203
     * @param array $stalledJobs
204
     *
205
     * @return int
206
     */
207 2
    protected function runStalledLoop($i, $count, $saveCount, array $stalledJobs)
208
    {
209 2
        $resetCount = 0;
210 2
        for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $count; ++$j) {
211
            /* StallableJob $job */
212 2
            $job = $stalledJobs[$j];
213 2
            $job->setStatus(StallableJob::STATUS_STALLED);
214 2
            if ($this->saveHistory($job)) {
215 2
                ++$resetCount;
216
            }
217
        }
218
219 2
        return $resetCount;
220
    }
221
222 2
    public function resetStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
223
    {
224 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
225 2
        $stalledJobsCount = count($stalledJobs);
226 2
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
227 2
        $countReset = 0;
228 2
        $saveCount = $this->getSaveCount($stalledJobsCount);
229 2
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
230 2
            $resetCount = $this->runStalledLoop($i, $stalledJobsCount, $saveCount, $stalledJobs);
231 2
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
232 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
233
            }
234 2
            $countReset += $resetCount;
235 2
            $this->flush();
236 2
            $this->updateProgress($progressCallback, $countReset, $stalledJobsCount);
237
        }
238
239 2
        return $countReset;
240
    }
241
242
    /**
243
     * @param string        $workerName
244
     * @param string        $method
245
     * @param callable|null $progressCallback
246
     */
247 4
    public function pruneStalledJobs($workerName = null, $method = null, callable $progressCallback = null)
248
    {
249 4
        $stalledJobs = $this->getStalledJobs($workerName, $method);
250 4
        $stalledJobsCount = count($stalledJobs);
251 4
        $this->updateProgress($progressCallback, 0, $stalledJobsCount);
252 4
        $countProcessed = 0;
253 4
        $saveCount = $this->getSaveCount($stalledJobsCount);
254 4
        for ($i = 0; $i < $stalledJobsCount; $i += $saveCount) {
255 4
            for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $stalledJobsCount; ++$j) {
256
                /** @var StallableJob $job */
257 4
                $job = $stalledJobs[$j];
258 4
                $job->setStatus(StallableJob::STATUS_STALLED);
259 4
                $job->setStalls(intval($job->getStalls()) + 1);
260 4
                $this->deleteJob($job);
261 4
                ++$countProcessed;
262
            }
263 4
            $this->flush();
264 4
            $this->updateProgress($progressCallback, $countProcessed, $stalledJobsCount);
265
        }
266
267 4
        return $countProcessed;
268
    }
269
270 8
    protected function stallableSaveHistory(StallableJob $job, $retry)
271
    {
272 8
        if (!$retry) {
273 8
            $this->deleteJob($job);
274
        }
275
276 8
        return $retry;
277
    }
278
279 43
    protected function stallableSave(StallableJob $job)
280
    {
281
        // Generate crc hash for the job
282 43
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
283 43
        $crcHash = hash('sha256', serialize($hashValues));
284 43
        $job->setCrcHash($crcHash);
285 43
        $objectManager = $this->getObjectManager();
0 ignored issues
show
Unused Code introduced by
$objectManager is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
286
287 43
        if (true === $job->getBatch()) {
288 2
            $oldJob = $this->updateNearestBatch($job);
289 2
            if ($oldJob) {
290 2
                return $oldJob;
291
            }
292
        }
293
294
        // Just save a new job
295 43
        $this->resetSaveOk(__FUNCTION__);
296 43
        $this->persist($job);
297
298 43
        return $job;
299
    }
300
301
    abstract protected function updateNearestBatch(Job $job);
302
303
    /**
304
     * @param string $objectName
305
     */
306
    abstract protected function stopIdGenerator($objectName);
307
308
    abstract protected function restoreIdGenerator($objectName);
309
310
    /**
311
     * @param array $criterion
312
     * @param int   $limit
313
     * @param int   $offset
314
     */
315 2
    private function resetJobsByCriterion(
316
        array $criterion,
317
        $limit,
318
        $offset
319
    ) {
320 2
        $objectManager = $this->getObjectManager();
321 2
        $this->resetSaveOk(__FUNCTION__);
322 2
        $objectName = $this->getJobClass();
323 2
        $archiveObjectName = $this->getJobArchiveClass();
324 2
        $jobRepository = $objectManager->getRepository($objectName);
325 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
326 2
        $className = $jobRepository->getClassName();
327 2
        $metadata = $objectManager->getClassMetadata($className);
328 2
        $this->stopIdGenerator($objectName);
329 2
        $identifierData = $metadata->getIdentifier();
330 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
331 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
332 2
        $countProcessed = 0;
333
334 2
        foreach ($results as $jobArchive) {
335 2
            $countProcessed += $this->resetArchiveJob($jobArchive);
336
        }
337 2
        $objectManager->flush();
338
339 2
        $this->restoreIdGenerator($objectName);
340
341 2
        return $countProcessed;
342
    }
343
344 22
    protected function resetSaveOk($function)
345
    {
346 22
    }
347
348
    /**
349
     * @param null $workerName
350
     * @param null $methodName
351
     * @param bool $prioritize
352
     */
353
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
354
355
    /**
356
     * @param StallableJob $jobArchive
357
     * @param $className
358
     *
359
     * @return int Number of jobs reset
360
     */
361 2
    protected function resetArchiveJob(StallableJob $jobArchive)
362
    {
363 2
        $objectManager = $this->getObjectManager();
364 2
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
365 2
            $objectManager->persist($jobArchive);
366
367 2
            return 0;
368
        }
369
370
        /** @var StallableJob $job */
371 2
        $className = $this->getJobClass();
372 2
        $newJob = new $className();
373 2
        Util::copy($jobArchive, $newJob);
374 2
        $this->resetJob($newJob);
375 2
        $objectManager->remove($jobArchive);
376 2
        $this->flush();
377
378 2
        return 1;
379
    }
380
381 8 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
382
    {
383 8
        if (!$job instanceof StallableJob) {
384
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
385
        }
386 8
        $job->setStatus(BaseJob::STATUS_NEW);
387 8
        $job->setMessage(null);
388 8
        $job->setFinishedAt(null);
389 8
        $job->setStartedAt(null);
390 8
        $job->setElapsed(null);
391 8
        $job->setRetries($job->getRetries() + 1);
392 8
        $this->persist($job);
393
394 8
        return true;
395
    }
396
397
    abstract public function getWorkersAndMethods();
398
399
    abstract public function countLiveJobs($workerName = null, $methodName = null);
400
401
    abstract public function archiveAllJobs($workerName = null, $methodName = null, callable $progressCallback = null);
402
}
403