Completed
Pull Request — master (#30)
by Matthew
06:35
created

DoctrineJobManager::extractStalledJobs()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 6
nc 2
nop 1
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentRepository;
6
use Doctrine\ORM\EntityRepository;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\StallableJob;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
15
abstract class DoctrineJobManager extends BaseDoctrineJobManager
16
{
17
    /** Number of jobs to prune / reset / gather at a time */
18
    const FETCH_COUNT = 100;
19
20
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
21
    const STALLED_SECONDS = 1800;
22
23
    /**
24
     * @param string $objectName
25
     */
26
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
27
28 2
    public function resetExceptionJobs($workerName = null, $method = null)
29
    {
30 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
31
32 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
33 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
34
35 2
        $countProcessed = 0;
36 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
37 2
            $countProcessed += $this->resetJobsByCriterion(
38 2
                $criterion,
39 2
                static::FETCH_COUNT,
40 2
                $i
41
            );
42
        }
43
44 2
        return $countProcessed;
45
    }
46
47
    /**
48
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
49
     *
50
     * @param null $workerName
51
     * @param null $method
52
     *
53
     * @return mixed
54
     */
55
    abstract protected function updateExpired($workerName = null, $method = null);
56
57 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
58
    {
59 9
        if (null !== $workerName) {
60 4
            $criterion['workerName'] = $workerName;
61
        }
62 9
        if (null !== $method) {
63 4
            $criterion['method'] = $method;
64
        }
65 9
    }
66
67 3
    public function pruneExpiredJobs($workerName = null, $method = null)
68
    {
69 3
        $count = $this->updateExpired($workerName, $method);
70 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
71 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
72 3
        $objectManager = $this->getObjectManager();
73 3
        $repository = $this->getRepository();
74 3
        $finalCount = 0;
75 3
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
76 3
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
77 3
            $innerCount = 0;
78 3
            if (!empty($expiredJobs)) {
79 3
                foreach ($expiredJobs as $expiredJob) {
80
                    /* @var Job $expiredJob */
81 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
82 3
                    $objectManager->remove($expiredJob);
83 3
                    ++$finalCount;
84 3
                    ++$innerCount;
85
                }
86
            }
87 3
            $this->flush();
88 3
            for ($j = 0; $j < $innerCount; ++$j) {
89 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
90
            }
91
        }
92
93 3
        return $finalCount;
94
    }
95
96 4
    protected function getStalledJobs($workerName = null, $method = null)
97
    {
98 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
99
100 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
101 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
102
103 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
104
105 4
        return $this->extractStalledJobs($runningJobs);
106
    }
107
108 4
    protected function findRunningJobs($criterion, $count)
109
    {
110 4
        $repository = $this->getRepository();
111 4
        $runningJobsById = [];
112
113 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
114 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
115 4
            if (!empty($runningJobs)) {
116 4
                foreach ($runningJobs as $job) {
117
                    /** @var StallableJob $job */
118 4
                    if (null !== $runId = $job->getRunId()) {
119 4
                        $runningJobsById[$runId][] = $job;
120
                    }
121
                }
122
            }
123
        }
124
125 4
        return $runningJobsById;
126
    }
127
128
    /**
129
     * @param $runId
130
     * @param array $jobs
131
     * @param array $stalledJobs
132
     */
133 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
134
    {
135 4
        $objectManager = $this->getObjectManager();
136 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
137 4
        if ($run = $runRepository->find($runId)) {
138 2
            foreach ($jobs as $job) {
139 2
                if ($run->getCurrentJobId() == $job->getId()) {
140 2
                    continue;
141
                }
142 2
                $stalledJobs[] = $job;
143
            }
144
        }
145 4
    }
146
147
    /**
148
     * @param array $runningJobsById
149
     *
150
     * @return array
151
     */
152 4
    protected function extractStalledJobs(array $runningJobsById)
153
    {
154 4
        $stalledJobs = [];
155 4
        foreach (array_keys($runningJobsById) as $runId) {
156 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
157 4
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
158
        }
159
160 4
        return $stalledJobs;
161
    }
162
163 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
164
    {
165 4
        $runManager = $this->getRunManager();
166 4
        if (!method_exists($runManager, 'getObjectManager')) {
167
            return;
168
        }
169 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
170
            return;
171
        }
172
173
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
174 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
175
        /** @var Run $run */
176 4
        if ($run = $runArchiveRepository->find($runId)) {
177 4
            if ($endTime = $run->getEndedAt()) {
178
                // Did it end over an hour ago
179 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
180 4
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
181
                }
182
            }
183
        }
184 4
    }
185
186 2
    protected function runStalledLoop($i, $count, array $stalledJobs)
187
    {
188 2
        $resetCount = 0;
189 2
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
190
            /* StallableJob $job */
191 2
            $job = $stalledJobs[$j];
192 2
            $job->setStatus(StallableJob::STATUS_STALLED);
193 2
            if ($this->saveHistory($job)) {
194 2
                ++$resetCount;
195
            }
196
        }
197
198 2
        return $resetCount;
199
    }
200
201 2
    public function resetStalledJobs($workerName = null, $method = null)
202
    {
203 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
204
205 2
        $countReset = 0;
206 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
207 2
            $resetCount = $this->runStalledLoop($i, $count, $stalledJobs);
208 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
209 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
210
            }
211 2
            $countReset += $resetCount;
212 2
            $this->flush();
213
        }
214
215 2
        return $countReset;
216
    }
217
218
    /**
219
     * @param string $workerName
220
     * @param string $method
221
     */
222 2
    public function pruneStalledJobs($workerName = null, $method = null)
223
    {
224 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
225
226 2
        $countProcessed = 0;
227 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
228 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
229
                /** @var StallableJob $job */
230 2
                $job = $stalledJobs[$j];
231 2
                $job->setStatus(StallableJob::STATUS_STALLED);
232 2
                $job->setStalls(intval($job->getStalls()) + 1);
233 2
                $this->deleteJob($job);
234 2
                ++$countProcessed;
235
            }
236 2
            $this->flush();
237
        }
238
239 2
        return $countProcessed;
240
    }
241
242 8
    protected function stallableSaveHistory(StallableJob $job, $retry)
243
    {
244 8
        if (!$retry) {
245 8
            $this->deleteJob($job);
246
        }
247
248 8
        return $retry;
249
    }
250
251 43
    protected function stallableSave(StallableJob $job)
252
    {
253
        // Generate crc hash for the job
254 43
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
255 43
        $crcHash = hash('sha256', serialize($hashValues));
256 43
        $job->setCrcHash($crcHash);
257 43
        $objectManager = $this->getObjectManager();
258
259 43
        if (true === $job->getBatch()) {
260 2
            $oldJob = $this->updateNearestBatch($job);
261 2
            if ($oldJob) {
262 2
                return $oldJob;
263
            }
264
        }
265
266
        // Just save a new job
267 43
        $this->resetSaveOk(__FUNCTION__);
268 43
        $objectManager->persist($job);
269 43
        $objectManager->flush();
270
271 43
        return $job;
272
    }
273
274
    abstract protected function updateNearestBatch(Job $job);
275
276
    /**
277
     * @param string $objectName
278
     */
279
    abstract protected function stopIdGenerator($objectName);
280
281
    abstract protected function restoreIdGenerator($objectName);
282
283
    /**
284
     * @param array $criterion
285
     * @param int   $limit
286
     * @param int   $offset
287
     */
288 2
    private function resetJobsByCriterion(
289
        array $criterion,
290
        $limit,
291
        $offset
292
    ) {
293 2
        $objectManager = $this->getObjectManager();
294 2
        $this->resetSaveOk(__FUNCTION__);
295 2
        $objectName = $this->getJobClass();
296 2
        $archiveObjectName = $this->getJobArchiveClass();
297 2
        $jobRepository = $objectManager->getRepository($objectName);
298 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
299 2
        $className = $jobRepository->getClassName();
300 2
        $metadata = $objectManager->getClassMetadata($className);
301 2
        $this->stopIdGenerator($objectName);
302 2
        $identifierData = $metadata->getIdentifier();
303 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
304 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
305 2
        $countProcessed = 0;
306
307 2
        foreach ($results as $jobArchive) {
308 2
            $countProcessed += $this->resetArchiveJob($jobArchive);
309
        }
310 2
        $objectManager->flush();
311
312 2
        $this->restoreIdGenerator($objectName);
313
314 2
        return $countProcessed;
315
    }
316
317 22
    protected function resetSaveOk($function)
318
    {
319 22
    }
320
321
    /**
322
     * @param null $workerName
323
     * @param null $methodName
324
     * @param bool $prioritize
325
     */
326
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
327
328
    /**
329
     * @param StallableJob $jobArchive
330
     * @param $className
331
     *
332
     * @return int Number of jobs reset
333
     */
334 2
    protected function resetArchiveJob(StallableJob $jobArchive)
335
    {
336 2
        $objectManager = $this->getObjectManager();
337 2
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
0 ignored issues
show
Bug introduced by
It seems like $jobArchive->getMaxRetries() targeting Dtc\QueueBundle\Model\Re...bleJob::getMaxRetries() can also be of type integer; however, Dtc\QueueBundle\Manager\...ager::updateMaxStatus() does only seem to accept null, maybe add an additional type check?

This check looks at variables that are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
338 2
            $objectManager->persist($jobArchive);
339
340 2
            return 0;
341
        }
342
343
        /** @var StallableJob $job */
344 2
        $className = $this->getJobClass();
345 2
        $newJob = new $className();
346 2
        Util::copy($jobArchive, $newJob);
347 2
        $this->resetJob($newJob);
348 2
        $objectManager->remove($jobArchive);
349 2
        $this->flush();
350
351 2
        return 1;
352
    }
353
354 8
    protected function resetJob(RetryableJob $job)
355
    {
356 8
        if (!$job instanceof StallableJob) {
357
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
358
        }
359 8
        $job->setStatus(BaseJob::STATUS_NEW);
360 8
        $job->setMessage(null);
361 8
        $job->setFinishedAt(null);
362 8
        $job->setStartedAt(null);
363 8
        $job->setElapsed(null);
364 8
        $job->setRetries($job->getRetries() + 1);
365 8
        $this->getObjectManager()->persist($job);
366 8
        $this->flush();
367
368 8
        return true;
369
    }
370
371
    abstract public function getWorkersAndMethods();
372
373
    abstract public function countLiveJobs($workerName = null, $methodName = null);
374
375
    abstract public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback);
376
}
377