Completed
Push — master ( 2db41f...aa6ae6 )
by Matthew
15:06 queued 36s
created

BaseJobManager::flush()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Model\JobTiming;
12
use Dtc\QueueBundle\Model\JobTimingManager;
13
use Dtc\QueueBundle\Model\PriorityJobManager;
14
use Dtc\QueueBundle\Model\RetryableJob;
15
use Dtc\QueueBundle\Model\Run;
16
use Dtc\QueueBundle\Model\RunManager;
17
use Dtc\QueueBundle\Util\Util;
18
19
abstract class BaseJobManager extends PriorityJobManager
20
{
21
    /** Number of jobs to prune / reset / gather at a time */
22
    const FETCH_COUNT = 100;
23
24
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
25
    const STALLED_SECONDS = 1800;
26
27
    /**
28
     * @var ObjectManager
29
     */
30
    protected $objectManager;
31
    /**
32
     * @var string
33
     */
34
    protected $jobArchiveClass;
35
36
    /**
37
     * @param string $objectName
0 ignored issues
show
Bug introduced by
There is no parameter named $objectName. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
38
     * @param string $archiveObjectName
0 ignored issues
show
Bug introduced by
There is no parameter named $archiveObjectName. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
39
     * @param string $runClass
0 ignored issues
show
Bug introduced by
There is no parameter named $runClass. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
40
     * @param string $runArchiveClass
0 ignored issues
show
Bug introduced by
There is no parameter named $runArchiveClass. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
41
     */
42 11
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, ObjectManager $objectManager,
43
        $jobClass,
44
        $jobArchiveClass)
45
    {
46 11
        $this->objectManager = $objectManager;
47 11
        $this->jobArchiveClass = $jobArchiveClass;
48 11
        parent::__construct($runManager, $jobTimingManager, $jobClass);
49 11
    }
50
51
    /**
52
     * @return ObjectManager
53
     */
54 38
    public function getObjectManager()
55
    {
56 38
        return $this->objectManager;
57
    }
58
59
    /**
60
     * @return string
61
     */
62 24
    public function getJobArchiveClass()
63
    {
64 24
        return $this->jobArchiveClass;
65
    }
66
67
    /**
68
     * @return ObjectRepository
69
     */
70 28
    public function getRepository()
71
    {
72 28
        return $this->getObjectManager()->getRepository($this->getJobClass());
73
    }
74
75
    /**
76
     * @param string $objectName
77
     */
78
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
79
80 2
    public function resetErroneousJobs($workerName = null, $method = null)
81
    {
82 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_ERROR, $workerName, $method);
83
84 2
        $criterion = ['status' => Job::STATUS_ERROR];
85 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
86
87 2
        $countProcessed = 0;
88 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
89 2
            $countProcessed += $this->resetJobsByCriterion(
90 2
                $criterion, static::FETCH_COUNT, $i);
91
        }
92
93 2
        return $countProcessed;
94
    }
95
96
    /**
97
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
98
     *
99
     * @param null $workerName
100
     * @param null $method
101
     *
102
     * @return mixed
103
     */
104
    abstract protected function updateExpired($workerName = null, $method = null);
105
106 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
107
    {
108 9
        if (null !== $workerName) {
109 4
            $criterion['workerName'] = $workerName;
110
        }
111 9
        if (null !== $method) {
112 4
            $criterion['method'] = $method;
113
        }
114 9
    }
115
116 3
    public function pruneExpiredJobs($workerName = null, $method = null)
117
    {
118 3
        $count = $this->updateExpired($workerName, $method);
119 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
120 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
121 3
        $objectManager = $this->getObjectManager();
122 3
        $repository = $this->getRepository();
123 3
        $finalCount = 0;
124 3
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
125 3
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
126 3
            $innerCount = 0;
127 3
            if (!empty($expiredJobs)) {
128 3
                foreach ($expiredJobs as $expiredJob) {
129
                    /* @var Job $expiredJob */
130 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
131 3
                    $objectManager->remove($expiredJob);
132 3
                    ++$finalCount;
133 3
                    ++$innerCount;
134
                }
135
            }
136 3
            $this->flush();
137 3
            for ($j = 0; $j < $innerCount; ++$j) {
138 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
139
            }
140
        }
141
142 3
        return $finalCount;
143
    }
144
145 7
    protected function flush()
146
    {
147 7
        $this->getObjectManager()->flush();
148 7
    }
149
150 4
    protected function getStalledJobs($workerName = null, $method = null)
151
    {
152 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
153
154 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
155 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
156
157 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
158
159 4
        return $this->extractStalledJobs($runningJobs);
160
    }
161
162 4
    protected function findRunningJobs($criterion, $count)
163
    {
164 4
        $repository = $this->getRepository();
165 4
        $runningJobsById = [];
166
167 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
168 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
169 4
            if (!empty($runningJobs)) {
170 4
                foreach ($runningJobs as $job) {
171
                    /** @var RetryableJob $job */
172 4
                    if (null !== $runId = $job->getRunId()) {
173 4
                        $runningJobsById[$runId][] = $job;
174
                    }
175
                }
176
            }
177
        }
178
179 4
        return $runningJobsById;
180
    }
181
182
    /**
183
     * @param $runId
184
     * @param array $jobs
185
     * @param array $stalledJobs
186
     */
187 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
188
    {
189 4
        $objectManager = $this->getObjectManager();
190 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
191 4
        if ($run = $runRepository->find($runId)) {
192 2
            foreach ($jobs as $job) {
193 2
                if ($run->getCurrentJobId() == $job->getId()) {
194 2
                    continue;
195
                }
196 2
                $stalledJobs[] = $job;
197
            }
198
        }
199 4
    }
200
201
    /**
202
     * @param array $runningJobsById
203
     *
204
     * @return array
205
     */
206 4
    protected function extractStalledJobs(array $runningJobsById)
207
    {
208 4
        $stalledJobs = [];
209 4
        foreach (array_keys($runningJobsById) as $runId) {
210 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
211 4
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
212
        }
213
214 4
        return $stalledJobs;
215
    }
216
217 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
218
    {
219 4
        $runManager = $this->getRunManager();
220 4
        if (!method_exists($runManager, 'getObjectManager')) {
221
            return;
222
        }
223 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
224
            return;
225
        }
226
227
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
228 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
229
        /** @var Run $run */
230 4
        if ($run = $runArchiveRepository->find($runId)) {
231 4
            if ($endTime = $run->getEndedAt()) {
232
                // Did it end over an hour ago
233 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
234 4
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
235
                }
236
            }
237
        }
238 4
    }
239
240 6
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
241
    {
242 6
        if (null !== $max && $count >= $max) {
243 4
            $job->setStatus($status);
244
245 4
            return true;
246
        }
247
248 6
        return false;
249
    }
250
251 2
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
252
    {
253 2
        $objectManager = $this->getObjectManager();
254 2
        $newCount = 0;
255 2
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
256
            /* RetryableJob $job */
257 2
            $job = $stalledJobs[$j];
258 2
            $job->setStalledCount($job->getStalledCount() + 1);
259 2
            if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount()) ||
260 2
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
261 2
                $objectManager->remove($job);
262 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
263 2
                continue;
264
            }
265
266 2
            $job->setRetries($job->getRetries() + 1);
267 2
            $job->setStatus(BaseJob::STATUS_NEW);
268 2
            $job->setLocked(null);
269 2
            $job->setLockedAt(null);
270 2
            $objectManager->persist($job);
271 2
            ++$newCount;
272 2
            ++$countProcessed;
273
        }
274
275 2
        return $newCount;
276
    }
277
278 2
    public function resetStalledJobs($workerName = null, $method = null)
279
    {
280 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
281
282 2
        $countProcessed = 0;
283 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
284 2
            $newCount = $this->runStalledLoop($i, $count, $stalledJobs, $countProcessed);
285 2
            $this->flush();
286 2
            for ($j = 0; $j < $newCount; ++$j) {
287 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
288 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
289
            }
290
        }
291
292 2
        return $countProcessed;
293
    }
294
295
    /**
296
     * @param string $workerName
297
     * @param string $method
298
     */
299 2
    public function pruneStalledJobs($workerName = null, $method = null)
300
    {
301 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
302 2
        $objectManager = $this->getObjectManager();
303
304 2
        $countProcessed = 0;
305 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
306 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
307
                /** @var RetryableJob $job */
308 2
                $job = $stalledJobs[$j];
309 2
                $job->setStalledCount($job->getStalledCount() + 1);
310 2
                $job->setStatus(BaseJob::STATUS_ERROR);
311 2
                $job->setMessage('stalled');
312 2
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
313 2
                $objectManager->remove($job);
314 2
                ++$countProcessed;
315
            }
316 2
            $this->flush();
317
        }
318
319 2
        return $countProcessed;
320
    }
321
322 10
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
323
    {
324 10
        $objectManager = $this->getObjectManager();
325 10
        $objectManager->remove($job);
326 10
        $objectManager->flush();
327 10
    }
328
329 2
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
330
    {
331 2
        $this->deleteJob($job); // Should cause job to be archived
332 2
    }
333
334 33
    protected function prioritySave(\Dtc\QueueBundle\Model\Job $job)
335
    {
336
        // Generate crc hash for the job
337 33
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
338 33
        $crcHash = hash('sha256', serialize($hashValues));
339 33
        $job->setCrcHash($crcHash);
340 33
        $objectManager = $this->getObjectManager();
341
342 33
        if (true === $job->getBatch()) {
343 2
            $oldJob = $this->updateNearestBatch($job);
344 2
            if ($oldJob) {
345 2
                return $oldJob;
346
            }
347
        }
348
349
        // Just save a new job
350 33
        $this->resetSaveOk(__FUNCTION__);
351 33
        $objectManager->persist($job);
352 33
        $objectManager->flush();
353
354 33
        return $job;
355
    }
356
357
    abstract protected function updateNearestBatch(Job $job);
358
359
    /**
360
     * @param string $objectName
361
     */
362
    abstract protected function stopIdGenerator($objectName);
363
364
    abstract protected function restoreIdGenerator($objectName);
365
366
    /**
367
     * @param array $criterion
368
     * @param int   $limit
369
     * @param int   $offset
370
     */
371 2
    private function resetJobsByCriterion(
372
        array $criterion,
373
        $limit,
374
        $offset)
375
    {
376 2
        $objectManager = $this->getObjectManager();
377 2
        $this->resetSaveOk(__FUNCTION__);
378 2
        $objectName = $this->getJobClass();
379 2
        $archiveObjectName = $this->getJobArchiveClass();
380 2
        $jobRepository = $objectManager->getRepository($objectName);
381 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
382 2
        $className = $jobRepository->getClassName();
383 2
        $metadata = $objectManager->getClassMetadata($className);
384 2
        $this->stopIdGenerator($objectName);
385 2
        $identifierData = $metadata->getIdentifier();
386 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
387 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
388 2
        $countProcessed = 0;
389
390 2
        foreach ($results as $jobArchive) {
391 2
            $this->resetJob($jobArchive, $className, $countProcessed);
392
        }
393 2
        $objectManager->flush();
394
395 2
        $this->restoreIdGenerator($objectName);
396
397 2
        return $countProcessed;
398
    }
399
400 17
    protected function resetSaveOk($function)
401
    {
402 17
    }
403
404
    /**
405
     * @param null $workerName
406
     * @param null $methodName
407
     * @param bool $prioritize
408
     */
409
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
410
411
    /**
412
     * @param RetryableJob $jobArchive
413
     * @param $className
414
     * @param $countProcessed
415
     */
416 2
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
417
    {
418 2
        $objectManager = $this->getObjectManager();
419 2
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
420 2
            $objectManager->persist($jobArchive);
421
422 2
            return;
423
        }
424
425
        /** @var RetryableJob $job */
426 2
        $job = new $className();
427
428 2
        Util::copy($jobArchive, $job);
429 2
        $job->setStatus(BaseJob::STATUS_NEW);
430 2
        $job->setLocked(null);
431 2
        $job->setLockedAt(null);
432 2
        $job->setMessage(null);
433 2
        $job->setFinishedAt(null);
434 2
        $job->setStartedAt(null);
435 2
        $job->setElapsed(null);
436 2
        $job->setRetries($job->getRetries() + 1);
437 2
        $objectManager->persist($job);
438 2
        $objectManager->remove($jobArchive);
439 2
        $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
440 2
        ++$countProcessed;
441 2
    }
442
}
443