Completed
Push — master ( 5fe2ea...bba9f0 )
by Matthew
13:23
created

BaseJobManager::prioritySave()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.1105

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 10
cts 13
cp 0.7692
rs 9.2
c 0
b 0
f 0
cc 3
eloc 13
nc 3
nop 1
crap 3.1105
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Model\JobTiming;
12
use Dtc\QueueBundle\Model\JobTimingManager;
13
use Dtc\QueueBundle\Model\PriorityJobManager;
14
use Dtc\QueueBundle\Model\RetryableJob;
15
use Dtc\QueueBundle\Model\Run;
16
use Dtc\QueueBundle\Model\RunManager;
17
use Dtc\QueueBundle\Util\Util;
18
19
abstract class BaseJobManager extends PriorityJobManager
20
{
21
    /** Number of jobs to prune / reset / gather at a time */
22
    const FETCH_COUNT = 100;
23
24
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
25
    const STALLED_SECONDS = 1800;
26
27
    /**
28
     * @var ObjectManager
29
     */
30
    protected $objectManager;
31
    /**
32
     * @var string
33
     */
34
    protected $jobArchiveClass;
35
36
    /**
37
     * BaseJobManager constructor.
38
     *
39
     * @param RunManager       $runManager
40
     * @param JobTimingManager $jobTimingManager
41
     * @param ObjectManager    $objectManager
42
     * @param $jobClass
43
     * @param $jobArchiveClass
44
     */
45 4
    public function __construct(
46
        RunManager $runManager,
47
        JobTimingManager $jobTimingManager,
48
        ObjectManager $objectManager,
49
        $jobClass,
50
        $jobArchiveClass
51
    ) {
52 4
        $this->objectManager = $objectManager;
53 4
        $this->jobArchiveClass = $jobArchiveClass;
54 4
        parent::__construct($runManager, $jobTimingManager, $jobClass);
55 4
    }
56
57
    /**
58
     * @return ObjectManager
59
     */
60 4
    public function getObjectManager()
61
    {
62 4
        return $this->objectManager;
63
    }
64
65
    /**
66
     * @return string
67
     */
68 4
    public function getJobArchiveClass()
69
    {
70 4
        return $this->jobArchiveClass;
71
    }
72
73
    /**
74
     * @return ObjectRepository
75
     */
76 4
    public function getRepository()
77
    {
78 4
        return $this->getObjectManager()->getRepository($this->getJobClass());
79
    }
80
81
    /**
82
     * @param string $objectName
83
     */
84
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
85
86
    public function resetErroneousJobs($workerName = null, $method = null)
87
    {
88
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_ERROR, $workerName, $method);
89
90
        $criterion = ['status' => Job::STATUS_ERROR];
91
        $this->addWorkerNameMethod($criterion, $workerName, $method);
92
93
        $countProcessed = 0;
94
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
95
            $countProcessed += $this->resetJobsByCriterion(
96
                $criterion,
97
                static::FETCH_COUNT,
98
                $i
99
            );
100
        }
101
102
        return $countProcessed;
103
    }
104
105
    /**
106
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
107
     *
108
     * @param null $workerName
109
     * @param null $method
110
     *
111
     * @return mixed
112
     */
113
    abstract protected function updateExpired($workerName = null, $method = null);
114
115 1
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
116
    {
117 1
        if (null !== $workerName) {
118
            $criterion['workerName'] = $workerName;
119
        }
120 1
        if (null !== $method) {
121
            $criterion['method'] = $method;
122
        }
123 1
    }
124
125 1
    public function pruneExpiredJobs($workerName = null, $method = null)
126
    {
127 1
        $count = $this->updateExpired($workerName, $method);
128 1
        $criterion = ['status' => Job::STATUS_EXPIRED];
129 1
        $this->addWorkerNameMethod($criterion, $workerName, $method);
130 1
        $objectManager = $this->getObjectManager();
131 1
        $repository = $this->getRepository();
132 1
        $finalCount = 0;
133 1
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
134 1
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
135 1
            $innerCount = 0;
136 1
            if (!empty($expiredJobs)) {
137 1
                foreach ($expiredJobs as $expiredJob) {
138
                    /* @var Job $expiredJob */
139 1
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
140 1
                    $objectManager->remove($expiredJob);
141 1
                    ++$finalCount;
142 1
                    ++$innerCount;
143
                }
144
            }
145 1
            $this->flush();
146 1
            for ($j = 0; $j < $innerCount; ++$j) {
147 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
148
            }
149
        }
150
151 1
        return $finalCount;
152
    }
153
154 1
    protected function flush()
155
    {
156 1
        $this->getObjectManager()->flush();
157 1
    }
158
159
    protected function getStalledJobs($workerName = null, $method = null)
160
    {
161
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
162
163
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
164
        $this->addWorkerNameMethod($criterion, $workerName, $method);
165
166
        $runningJobs = $this->findRunningJobs($criterion, $count);
167
168
        return $this->extractStalledJobs($runningJobs);
169
    }
170
171
    protected function findRunningJobs($criterion, $count)
172
    {
173
        $repository = $this->getRepository();
174
        $runningJobsById = [];
175
176
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
177
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
178
            if (!empty($runningJobs)) {
179
                foreach ($runningJobs as $job) {
180
                    /** @var RetryableJob $job */
181
                    if (null !== $runId = $job->getRunId()) {
182
                        $runningJobsById[$runId][] = $job;
183
                    }
184
                }
185
            }
186
        }
187
188
        return $runningJobsById;
189
    }
190
191
    /**
192
     * @param $runId
193
     * @param array $jobs
194
     * @param array $stalledJobs
195
     */
196
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
197
    {
198
        $objectManager = $this->getObjectManager();
199
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
200
        if ($run = $runRepository->find($runId)) {
201
            foreach ($jobs as $job) {
202
                if ($run->getCurrentJobId() == $job->getId()) {
203
                    continue;
204
                }
205
                $stalledJobs[] = $job;
206
            }
207
        }
208
    }
209
210
    /**
211
     * @param array $runningJobsById
212
     *
213
     * @return array
214
     */
215
    protected function extractStalledJobs(array $runningJobsById)
216
    {
217
        $stalledJobs = [];
218
        foreach (array_keys($runningJobsById) as $runId) {
219
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
220
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
221
        }
222
223
        return $stalledJobs;
224
    }
225
226
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
227
    {
228
        $runManager = $this->getRunManager();
229
        if (!method_exists($runManager, 'getObjectManager')) {
230
            return;
231
        }
232
        if (!method_exists($runManager, 'getRunArchiveClass')) {
233
            return;
234
        }
235
236
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
237
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
238
        /** @var Run $run */
239
        if ($run = $runArchiveRepository->find($runId)) {
240
            if ($endTime = $run->getEndedAt()) {
241
                // Did it end over an hour ago
242
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
243
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
244
                }
245
            }
246
        }
247
    }
248
249
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
250
    {
251
        if (null !== $max && $count >= $max) {
252
            $job->setStatus($status);
253
254
            return true;
255
        }
256
257
        return false;
258
    }
259
260
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
261
    {
262
        $objectManager = $this->getObjectManager();
263
        $newCount = 0;
264
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
265
            /* RetryableJob $job */
266
            $job = $stalledJobs[$j];
267
            $job->setStalledCount($job->getStalledCount() + 1);
268
            if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount()) ||
269
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
270
                $objectManager->remove($job);
271
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
272
                continue;
273
            }
274
275
            $job->setRetries($job->getRetries() + 1);
276
            $job->setStatus(BaseJob::STATUS_NEW);
277
            $job->setLocked(null);
278
            $job->setLockedAt(null);
279
            $objectManager->persist($job);
280
            ++$newCount;
281
            ++$countProcessed;
282
        }
283
284
        return $newCount;
285
    }
286
287
    public function resetStalledJobs($workerName = null, $method = null)
288
    {
289
        $stalledJobs = $this->getStalledJobs($workerName, $method);
290
291
        $countProcessed = 0;
292
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
293
            $newCount = $this->runStalledLoop($i, $count, $stalledJobs, $countProcessed);
294
            $this->flush();
295
            for ($j = 0; $j < $newCount; ++$j) {
296
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
297
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
298
            }
299
        }
300
301
        return $countProcessed;
302
    }
303
304
    /**
305
     * @param string $workerName
306
     * @param string $method
307
     */
308
    public function pruneStalledJobs($workerName = null, $method = null)
309
    {
310
        $stalledJobs = $this->getStalledJobs($workerName, $method);
311
        $objectManager = $this->getObjectManager();
312
313
        $countProcessed = 0;
314
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
315
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
316
                /** @var RetryableJob $job */
317
                $job = $stalledJobs[$j];
318
                $job->setStalledCount($job->getStalledCount() + 1);
319
                $job->setStatus(BaseJob::STATUS_ERROR);
320
                $job->setMessage('stalled');
321
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
322
                $objectManager->remove($job);
323
                ++$countProcessed;
324
            }
325
            $this->flush();
326
        }
327
328
        return $countProcessed;
329
    }
330
331 1
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
332
    {
333 1
        $objectManager = $this->getObjectManager();
334 1
        $objectManager->remove($job);
335 1
        $objectManager->flush();
336 1
    }
337
338 1
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
339
    {
340 1
        $this->deleteJob($job); // Should cause job to be archived
341 1
    }
342
343 2
    protected function prioritySave(\Dtc\QueueBundle\Model\Job $job)
344
    {
345
        // Generate crc hash for the job
346 2
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
347 2
        $crcHash = hash('sha256', serialize($hashValues));
348 2
        $job->setCrcHash($crcHash);
349 2
        $objectManager = $this->getObjectManager();
350
351 2
        if (true === $job->getBatch()) {
352
            $oldJob = $this->updateNearestBatch($job);
353
            if ($oldJob) {
354
                return $oldJob;
355
            }
356
        }
357
358
        // Just save a new job
359 2
        $this->resetSaveOk(__FUNCTION__);
360 2
        $objectManager->persist($job);
361 2
        $objectManager->flush();
362
363 2
        return $job;
364
    }
365
366
    abstract protected function updateNearestBatch(Job $job);
367
368
    /**
369
     * @param string $objectName
370
     */
371
    abstract protected function stopIdGenerator($objectName);
372
373
    abstract protected function restoreIdGenerator($objectName);
374
375
    /**
376
     * @param array $criterion
377
     * @param int   $limit
378
     * @param int   $offset
379
     */
380
    private function resetJobsByCriterion(
381
        array $criterion,
382
        $limit,
383
        $offset
384
    ) {
385
        $objectManager = $this->getObjectManager();
386
        $this->resetSaveOk(__FUNCTION__);
387
        $objectName = $this->getJobClass();
388
        $archiveObjectName = $this->getJobArchiveClass();
389
        $jobRepository = $objectManager->getRepository($objectName);
390
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
391
        $className = $jobRepository->getClassName();
392
        $metadata = $objectManager->getClassMetadata($className);
393
        $this->stopIdGenerator($objectName);
394
        $identifierData = $metadata->getIdentifier();
395
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
396
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
397
        $countProcessed = 0;
398
399
        foreach ($results as $jobArchive) {
400
            $this->resetJob($jobArchive, $className, $countProcessed);
401
        }
402
        $objectManager->flush();
403
404
        $this->restoreIdGenerator($objectName);
405
406
        return $countProcessed;
407
    }
408
409 1
    protected function resetSaveOk($function)
410
    {
411 1
    }
412
413
    /**
414
     * @param null $workerName
415
     * @param null $methodName
416
     * @param bool $prioritize
417
     */
418
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
419
420
    /**
421
     * @param RetryableJob $jobArchive
422
     * @param $className
423
     * @param $countProcessed
424
     */
425
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
426
    {
427
        $objectManager = $this->getObjectManager();
428
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
429
            $objectManager->persist($jobArchive);
430
431
            return;
432
        }
433
434
        /** @var RetryableJob $job */
435
        $job = new $className();
436
437
        Util::copy($jobArchive, $job);
438
        $job->setStatus(BaseJob::STATUS_NEW);
439
        $job->setLocked(null);
440
        $job->setLockedAt(null);
441
        $job->setMessage(null);
442
        $job->setFinishedAt(null);
443
        $job->setStartedAt(null);
444
        $job->setElapsed(null);
445
        $job->setRetries($job->getRetries() + 1);
446
        $objectManager->persist($job);
447
        $objectManager->remove($jobArchive);
448
        $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
449
        ++$countProcessed;
450
    }
451
452
    abstract public function getWorkersAndMethods();
453
454
    abstract public function countLiveJobs($workerName = null, $methodName = null);
455
456
    abstract public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback);
457
}
458