Completed
Pull Request — master (#30)
by Matthew
14:09 queued 11:28
created

DoctrineJobManager::stallableSaveHistory()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 2
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Manager\ArchivableJobManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\RetryableJob;
12
use Dtc\QueueBundle\Model\Job;
13
use Dtc\QueueBundle\Model\JobTiming;
14
use Dtc\QueueBundle\Manager\JobTimingManager;
15
use Dtc\QueueBundle\Model\StallableJob;
16
use Dtc\QueueBundle\Model\Run;
17
use Dtc\QueueBundle\Manager\RunManager;
18
use Dtc\QueueBundle\Util\Util;
19
20
abstract class DoctrineJobManager extends ArchivableJobManager
21
{
22
    /** Number of jobs to prune / reset / gather at a time */
23
    const FETCH_COUNT = 100;
24
25
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
26
    const STALLED_SECONDS = 1800;
27
28
    /**
29
     * @var ObjectManager
30
     */
31
    protected $objectManager;
32
33
    /**
34
     * DoctrineJobManager constructor.
35
     *
36
     * @param RunManager       $runManager
37
     * @param JobTimingManager $jobTimingManager
38
     * @param ObjectManager    $objectManager
39
     * @param $jobClass
40
     * @param $jobArchiveClass
41
     */
42 20
    public function __construct(
43
        RunManager $runManager,
44
        JobTimingManager $jobTimingManager,
45
        ObjectManager $objectManager,
46
        $jobClass,
47
        $jobArchiveClass
48
    ) {
49 20
        $this->objectManager = $objectManager;
50 20
        parent::__construct($runManager, $jobTimingManager, $jobClass, $jobArchiveClass);
51 20
    }
52
53
    /**
54
     * @return ObjectManager
55
     */
56 57
    public function getObjectManager()
57
    {
58 57
        return $this->objectManager;
59
    }
60
61
    /**
62
     * @return ObjectRepository
63
     */
64 27
    public function getRepository()
65
    {
66 27
        return $this->getObjectManager()->getRepository($this->getJobClass());
67
    }
68
69
    /**
70
     * @param string $objectName
71
     */
72
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
73
74 2
    public function resetExceptionJobs($workerName = null, $method = null)
75
    {
76 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
77
78 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
79 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
80
81 2
        $countProcessed = 0;
82 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
83 2
            $countProcessed += $this->resetJobsByCriterion(
84 2
                $criterion,
85 2
                static::FETCH_COUNT,
86
                $i
87 2
            );
88 2
        }
89
90 2
        return $countProcessed;
91
    }
92
93
    /**
94
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
95
     *
96
     * @param null $workerName
97
     * @param null $method
98
     *
99
     * @return mixed
100
     */
101
    abstract protected function updateExpired($workerName = null, $method = null);
102
103 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
104
    {
105 9
        if (null !== $workerName) {
106 4
            $criterion['workerName'] = $workerName;
107 4
        }
108 9
        if (null !== $method) {
109 4
            $criterion['method'] = $method;
110 4
        }
111 9
    }
112
113 3
    public function pruneExpiredJobs($workerName = null, $method = null)
114
    {
115 3
        $count = $this->updateExpired($workerName, $method);
116 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
117 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
118 3
        $objectManager = $this->getObjectManager();
119 3
        $repository = $this->getRepository();
120 3
        $finalCount = 0;
121 3
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
122 3
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
123 3
            $innerCount = 0;
124 3
            if (!empty($expiredJobs)) {
125 3
                foreach ($expiredJobs as $expiredJob) {
126
                    /* @var Job $expiredJob */
127 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
128 3
                    $objectManager->remove($expiredJob);
129 3
                    ++$finalCount;
130 3
                    ++$innerCount;
131 3
                }
132 3
            }
133 3
            $this->flush();
134 3
            for ($j = 0; $j < $innerCount; ++$j) {
135 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
136 3
            }
137 3
        }
138
139 3
        return $finalCount;
140
    }
141
142 15
    protected function flush()
143
    {
144 15
        $this->getObjectManager()->flush();
145 15
    }
146
147 4
    protected function getStalledJobs($workerName = null, $method = null)
148
    {
149 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
150
151 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
152 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
153
154 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
155
156 4
        return $this->extractStalledJobs($runningJobs);
157
    }
158
159 4
    protected function findRunningJobs($criterion, $count)
160
    {
161 4
        $repository = $this->getRepository();
162 4
        $runningJobsById = [];
163
164 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
165 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
166 4
            if (!empty($runningJobs)) {
167 4
                foreach ($runningJobs as $job) {
168
                    /** @var StallableJob $job */
169 4
                    if (null !== $runId = $job->getRunId()) {
170 4
                        $runningJobsById[$runId][] = $job;
171 4
                    }
172 4
                }
173 4
            }
174 4
        }
175
176 4
        return $runningJobsById;
177
    }
178
179
    /**
180
     * @param $runId
181
     * @param array $jobs
182
     * @param array $stalledJobs
183
     */
184 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
185
    {
186 4
        $objectManager = $this->getObjectManager();
187 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
188 4
        if ($run = $runRepository->find($runId)) {
189 2
            foreach ($jobs as $job) {
190 2
                if ($run->getCurrentJobId() == $job->getId()) {
191 2
                    continue;
192
                }
193 2
                $stalledJobs[] = $job;
194 2
            }
195 2
        }
196 4
    }
197
198
    /**
199
     * @param array $runningJobsById
200
     *
201
     * @return array
202
     */
203 4
    protected function extractStalledJobs(array $runningJobsById)
204
    {
205 4
        $stalledJobs = [];
206 4
        foreach (array_keys($runningJobsById) as $runId) {
207 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
208 4
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
209 4
        }
210
211 4
        return $stalledJobs;
212
    }
213
214 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
215
    {
216 4
        $runManager = $this->getRunManager();
217 4
        if (!method_exists($runManager, 'getObjectManager')) {
218
            return;
219
        }
220 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
221
            return;
222
        }
223
224
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
225 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
226
        /** @var Run $run */
227 4
        if ($run = $runArchiveRepository->find($runId)) {
228 4
            if ($endTime = $run->getEndedAt()) {
229
                // Did it end over an hour ago
230 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
231 4
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
232 4
                }
233 4
            }
234 4
        }
235 4
    }
236
237 2
    protected function runStalledLoop($i, $count, array $stalledJobs)
238
    {
239 2
        $resetCount = 0;
240 2
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
241
            /* StallableJob $job */
242 2
            $job = $stalledJobs[$j];
243 2
            $job->setStatus(StallableJob::STATUS_STALLED);
244 2
            if ($this->saveHistory($job)) {
245 2
                ++$resetCount;
246 2
            }
247 2
        }
248
249 2
        return $resetCount;
250
    }
251
252 2
    public function resetStalledJobs($workerName = null, $method = null)
253
    {
254 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
255
256 2
        $countReset = 0;
257 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
258 2
            $resetCount = $this->runStalledLoop($i, $count, $stalledJobs);
259 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
260 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
261 2
            }
262 2
            $countReset += $resetCount;
263 2
            $this->flush();
264 2
        }
265
266 2
        return $countReset;
267
    }
268
269
    /**
270
     * @param string $workerName
271
     * @param string $method
272
     */
273 2
    public function pruneStalledJobs($workerName = null, $method = null)
274
    {
275 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
276
277 2
        $countProcessed = 0;
278 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
279 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
280
                /** @var StallableJob $job */
281 2
                $job = $stalledJobs[$j];
282 2
                $job->setStatus(StallableJob::STATUS_STALLED);
283 2
                $job->setStalls(intval($job->getStalls()) + 1);
284 2
                $this->deleteJob($job);
285 2
                ++$countProcessed;
286 2
            }
287 2
            $this->flush();
288 2
        }
289
290 2
        return $countProcessed;
291
    }
292
293 20
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
294
    {
295 20
        $objectManager = $this->getObjectManager();
296 20
        $objectManager->remove($job);
297 20
        $objectManager->flush();
298 20
    }
299
300 8
    protected function stallableSaveHistory(StallableJob $job, $retry)
301
    {
302 8
        if (!$retry) {
303 8
            $this->deleteJob($job);
304 8
        }
305
306 8
        return $retry;
307
    }
308
309 43
    protected function stallableSave(StallableJob $job)
310
    {
311
        // Generate crc hash for the job
312 43
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
313 43
        $crcHash = hash('sha256', serialize($hashValues));
314 43
        $job->setCrcHash($crcHash);
315 43
        $objectManager = $this->getObjectManager();
316
317 43
        if (true === $job->getBatch()) {
318 2
            $oldJob = $this->updateNearestBatch($job);
319 2
            if ($oldJob) {
320 2
                return $oldJob;
321
            }
322
        }
323
324
        // Just save a new job
325 43
        $this->resetSaveOk(__FUNCTION__);
326 43
        $objectManager->persist($job);
327 43
        $objectManager->flush();
328
329 43
        return $job;
330
    }
331
332
    abstract protected function updateNearestBatch(Job $job);
333
334
    /**
335
     * @param string $objectName
336
     */
337
    abstract protected function stopIdGenerator($objectName);
338
339
    abstract protected function restoreIdGenerator($objectName);
340
341
    /**
342
     * @param array $criterion
343
     * @param int   $limit
344
     * @param int   $offset
345
     */
346 2
    private function resetJobsByCriterion(
347
        array $criterion,
348
        $limit,
349
        $offset
350
    ) {
351 2
        $objectManager = $this->getObjectManager();
352 2
        $this->resetSaveOk(__FUNCTION__);
353 2
        $objectName = $this->getJobClass();
354 2
        $archiveObjectName = $this->getJobArchiveClass();
355 2
        $jobRepository = $objectManager->getRepository($objectName);
356 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
357 2
        $className = $jobRepository->getClassName();
358 2
        $metadata = $objectManager->getClassMetadata($className);
359 2
        $this->stopIdGenerator($objectName);
360 2
        $identifierData = $metadata->getIdentifier();
361 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
362 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
363 2
        $countProcessed = 0;
364
365 2
        foreach ($results as $jobArchive) {
366 2
            $countProcessed += $this->resetArchiveJob($jobArchive);
367 2
        }
368 2
        $objectManager->flush();
369
370 2
        $this->restoreIdGenerator($objectName);
371
372 2
        return $countProcessed;
373
    }
374
375 22
    protected function resetSaveOk($function)
376
    {
377 22
    }
378
379
    /**
380
     * @param null $workerName
381
     * @param null $methodName
382
     * @param bool $prioritize
383
     */
384
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
385
386
    /**
387
     * @param StallableJob $jobArchive
388
     * @param $className
389
     *
390
     * @return int Number of jobs reset
391
     */
392 2
    protected function resetArchiveJob(StallableJob $jobArchive)
393
    {
394 2
        $objectManager = $this->getObjectManager();
395 2
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
0 ignored issues
show
Bug introduced by
It seems like $jobArchive->getMaxRetries() targeting Dtc\QueueBundle\Model\Re...bleJob::getMaxRetries() can also be of type integer; however, Dtc\QueueBundle\Manager\...ager::updateMaxStatus() does only seem to accept null, maybe add an additional type check?

This check looks at variables that are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
396 2
            $objectManager->persist($jobArchive);
397
398 2
            return 0;
399
        }
400
401
        /** @var StallableJob $job */
402 2
        $className = $this->getJobClass();
403 2
        $newJob = new $className();
404 2
        Util::copy($jobArchive, $newJob);
405 2
        $this->resetJob($newJob);
406 2
        $objectManager->remove($jobArchive);
407 2
        $this->flush();
408
409 2
        return 1;
410
    }
411
412 8
    protected function resetJob(RetryableJob $job)
413
    {
414 8
        if (!$job instanceof StallableJob) {
415
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
416
        }
417 8
        $job->setStatus(BaseJob::STATUS_NEW);
418 8
        $job->setMessage(null);
419 8
        $job->setFinishedAt(null);
420 8
        $job->setStartedAt(null);
421 8
        $job->setElapsed(null);
422 8
        $job->setRetries($job->getRetries() + 1);
423 8
        $this->getObjectManager()->persist($job);
424 8
        $this->flush();
425
426 8
        return true;
427
    }
428
429
    abstract public function getWorkersAndMethods();
430
431
    abstract public function countLiveJobs($workerName = null, $methodName = null);
432
433
    abstract public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback);
434
}
435