Completed
Branch master (38bcb9)
by Matthew
02:36
created

BaseJobManager::prioritySave()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.1105

Importance

Changes 0
Metric Value
dl 0
loc 22
ccs 10
cts 13
cp 0.7692
rs 9.2
c 0
b 0
f 0
cc 3
eloc 13
nc 3
nop 1
crap 3.1105
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\ArchivableJobManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\Job;
12
use Dtc\QueueBundle\Model\JobTiming;
13
use Dtc\QueueBundle\Model\JobTimingManager;
14
use Dtc\QueueBundle\Model\RetryableJob;
15
use Dtc\QueueBundle\Model\Run;
16
use Dtc\QueueBundle\Model\RunManager;
17
use Dtc\QueueBundle\Util\Util;
18
19
abstract class BaseJobManager extends ArchivableJobManager
20
{
21
    /** Number of jobs to prune / reset / gather at a time */
22
    const FETCH_COUNT = 100;
23
24
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
25
    const STALLED_SECONDS = 1800;
26
27
    /**
28
     * @var ObjectManager
29
     */
30
    protected $objectManager;
31
32
    /**
33
     * BaseJobManager constructor.
34
     *
35
     * @param RunManager       $runManager
36
     * @param JobTimingManager $jobTimingManager
37
     * @param ObjectManager    $objectManager
38
     * @param $jobClass
39
     * @param $jobArchiveClass
40
     */
41 3
    public function __construct(
42
        RunManager $runManager,
43
        JobTimingManager $jobTimingManager,
44
        ObjectManager $objectManager,
45
        $jobClass,
46
        $jobArchiveClass
47
    ) {
48 3
        $this->objectManager = $objectManager;
49 3
        parent::__construct($runManager, $jobTimingManager, $jobClass, $jobArchiveClass);
50 3
    }
51
52
    /**
53
     * @return ObjectManager
54
     */
55 3
    public function getObjectManager()
56
    {
57 3
        return $this->objectManager;
58
    }
59
60
    /**
61
     * @return ObjectRepository
62
     */
63 3
    public function getRepository()
64
    {
65 3
        return $this->getObjectManager()->getRepository($this->getJobClass());
66
    }
67
68
    /**
69
     * @param string $objectName
70
     */
71
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
72
73
    public function resetErroneousJobs($workerName = null, $method = null)
74
    {
75
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_ERROR, $workerName, $method);
76
77
        $criterion = ['status' => Job::STATUS_ERROR];
78
        $this->addWorkerNameMethod($criterion, $workerName, $method);
79
80
        $countProcessed = 0;
81
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
82
            $countProcessed += $this->resetJobsByCriterion(
83
                $criterion,
84
                static::FETCH_COUNT,
85
                $i
86
            );
87
        }
88
89
        return $countProcessed;
90
    }
91
92
    /**
93
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
94
     *
95
     * @param null $workerName
96
     * @param null $method
97
     *
98
     * @return mixed
99
     */
100
    abstract protected function updateExpired($workerName = null, $method = null);
101
102 1
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
103
    {
104 1
        if (null !== $workerName) {
105
            $criterion['workerName'] = $workerName;
106
        }
107 1
        if (null !== $method) {
108
            $criterion['method'] = $method;
109
        }
110 1
    }
111
112 1
    public function pruneExpiredJobs($workerName = null, $method = null)
113
    {
114 1
        $count = $this->updateExpired($workerName, $method);
115 1
        $criterion = ['status' => Job::STATUS_EXPIRED];
116 1
        $this->addWorkerNameMethod($criterion, $workerName, $method);
117 1
        $objectManager = $this->getObjectManager();
118 1
        $repository = $this->getRepository();
119 1
        $finalCount = 0;
120 1
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
121 1
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
122 1
            $innerCount = 0;
123 1
            if (!empty($expiredJobs)) {
124 1
                foreach ($expiredJobs as $expiredJob) {
125
                    /* @var Job $expiredJob */
126 1
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
127 1
                    $objectManager->remove($expiredJob);
128 1
                    ++$finalCount;
129 1
                    ++$innerCount;
130
                }
131
            }
132 1
            $this->flush();
133 1
            for ($j = 0; $j < $innerCount; ++$j) {
134 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
135
            }
136
        }
137
138 1
        return $finalCount;
139
    }
140
141 1
    protected function flush()
142
    {
143 1
        $this->getObjectManager()->flush();
144 1
    }
145
146
    protected function getStalledJobs($workerName = null, $method = null)
147
    {
148
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
149
150
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
151
        $this->addWorkerNameMethod($criterion, $workerName, $method);
152
153
        $runningJobs = $this->findRunningJobs($criterion, $count);
154
155
        return $this->extractStalledJobs($runningJobs);
156
    }
157
158
    protected function findRunningJobs($criterion, $count)
159
    {
160
        $repository = $this->getRepository();
161
        $runningJobsById = [];
162
163
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
164
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
165
            if (!empty($runningJobs)) {
166
                foreach ($runningJobs as $job) {
167
                    /** @var RetryableJob $job */
168
                    if (null !== $runId = $job->getRunId()) {
169
                        $runningJobsById[$runId][] = $job;
170
                    }
171
                }
172
            }
173
        }
174
175
        return $runningJobsById;
176
    }
177
178
    /**
179
     * @param $runId
180
     * @param array $jobs
181
     * @param array $stalledJobs
182
     */
183
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
184
    {
185
        $objectManager = $this->getObjectManager();
186
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
187
        if ($run = $runRepository->find($runId)) {
188
            foreach ($jobs as $job) {
189
                if ($run->getCurrentJobId() == $job->getId()) {
190
                    continue;
191
                }
192
                $stalledJobs[] = $job;
193
            }
194
        }
195
    }
196
197
    /**
198
     * @param array $runningJobsById
199
     *
200
     * @return array
201
     */
202
    protected function extractStalledJobs(array $runningJobsById)
203
    {
204
        $stalledJobs = [];
205
        foreach (array_keys($runningJobsById) as $runId) {
206
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
207
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
208
        }
209
210
        return $stalledJobs;
211
    }
212
213
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
214
    {
215
        $runManager = $this->getRunManager();
216
        if (!method_exists($runManager, 'getObjectManager')) {
217
            return;
218
        }
219
        if (!method_exists($runManager, 'getRunArchiveClass')) {
220
            return;
221
        }
222
223
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
224
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
225
        /** @var Run $run */
226
        if ($run = $runArchiveRepository->find($runId)) {
227
            if ($endTime = $run->getEndedAt()) {
228
                // Did it end over an hour ago
229
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
230
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
231
                }
232
            }
233
        }
234
    }
235
236
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
237
    {
238
        if (null !== $max && $count >= $max) {
239
            $job->setStatus($status);
240
241
            return true;
242
        }
243
244
        return false;
245
    }
246
247
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
248
    {
249
        $objectManager = $this->getObjectManager();
250
        $newCount = 0;
251
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
252
            /* RetryableJob $job */
253
            $job = $stalledJobs[$j];
254
            $job->setStalledCount($job->getStalledCount() + 1);
255
            if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount()) ||
256
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
257
                $objectManager->remove($job);
258
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
259
                continue;
260
            }
261
262
            $job->setRetries($job->getRetries() + 1);
263
            $job->setStatus(BaseJob::STATUS_NEW);
264
            $job->setLocked(null);
265
            $job->setLockedAt(null);
266
            $objectManager->persist($job);
267
            ++$newCount;
268
            ++$countProcessed;
269
        }
270
271
        return $newCount;
272
    }
273
274
    public function resetStalledJobs($workerName = null, $method = null)
275
    {
276
        $stalledJobs = $this->getStalledJobs($workerName, $method);
277
278
        $countProcessed = 0;
279
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
280
            $newCount = $this->runStalledLoop($i, $count, $stalledJobs, $countProcessed);
281
            $this->flush();
282
            for ($j = 0; $j < $newCount; ++$j) {
283
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
284
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
285
            }
286
        }
287
288
        return $countProcessed;
289
    }
290
291
    /**
292
     * @param string $workerName
293
     * @param string $method
294
     */
295
    public function pruneStalledJobs($workerName = null, $method = null)
296
    {
297
        $stalledJobs = $this->getStalledJobs($workerName, $method);
298
        $objectManager = $this->getObjectManager();
299
300
        $countProcessed = 0;
301
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
302
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
303
                /** @var RetryableJob $job */
304
                $job = $stalledJobs[$j];
305
                $job->setStalledCount($job->getStalledCount() + 1);
306
                $job->setStatus(BaseJob::STATUS_ERROR);
307
                $job->setMessage('stalled');
308
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
309
                $objectManager->remove($job);
310
                ++$countProcessed;
311
            }
312
            $this->flush();
313
        }
314
315
        return $countProcessed;
316
    }
317
318
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
319
    {
320
        $objectManager = $this->getObjectManager();
321
        $objectManager->remove($job);
322
        $objectManager->flush();
323
    }
324
325
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
326
    {
327
        $this->deleteJob($job); // Should cause job to be archived
328
    }
329
330 1
    protected function prioritySave(\Dtc\QueueBundle\Model\Job $job)
331
    {
332
        // Generate crc hash for the job
333 1
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
334 1
        $crcHash = hash('sha256', serialize($hashValues));
335 1
        $job->setCrcHash($crcHash);
336 1
        $objectManager = $this->getObjectManager();
337
338 1
        if (true === $job->getBatch()) {
339
            $oldJob = $this->updateNearestBatch($job);
340
            if ($oldJob) {
341
                return $oldJob;
342
            }
343
        }
344
345
        // Just save a new job
346 1
        $this->resetSaveOk(__FUNCTION__);
347 1
        $objectManager->persist($job);
348 1
        $objectManager->flush();
349
350 1
        return $job;
351
    }
352
353
    abstract protected function updateNearestBatch(Job $job);
354
355
    /**
356
     * @param string $objectName
357
     */
358
    abstract protected function stopIdGenerator($objectName);
359
360
    abstract protected function restoreIdGenerator($objectName);
361
362
    /**
363
     * @param array $criterion
364
     * @param int   $limit
365
     * @param int   $offset
366
     */
367
    private function resetJobsByCriterion(
368
        array $criterion,
369
        $limit,
370
        $offset
371
    ) {
372
        $objectManager = $this->getObjectManager();
373
        $this->resetSaveOk(__FUNCTION__);
374
        $objectName = $this->getJobClass();
375
        $archiveObjectName = $this->getJobArchiveClass();
376
        $jobRepository = $objectManager->getRepository($objectName);
377
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
378
        $className = $jobRepository->getClassName();
379
        $metadata = $objectManager->getClassMetadata($className);
380
        $this->stopIdGenerator($objectName);
381
        $identifierData = $metadata->getIdentifier();
382
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
383
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
384
        $countProcessed = 0;
385
386
        foreach ($results as $jobArchive) {
387
            $this->resetJob($jobArchive, $className, $countProcessed);
388
        }
389
        $objectManager->flush();
390
391
        $this->restoreIdGenerator($objectName);
392
393
        return $countProcessed;
394
    }
395
396 1
    protected function resetSaveOk($function)
397
    {
398 1
    }
399
400
    /**
401
     * @param null $workerName
402
     * @param null $methodName
403
     * @param bool $prioritize
404
     */
405
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
406
407
    /**
408
     * @param RetryableJob $jobArchive
409
     * @param $className
410
     * @param $countProcessed
411
     */
412
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
413
    {
414
        $objectManager = $this->getObjectManager();
415
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
416
            $objectManager->persist($jobArchive);
417
418
            return;
419
        }
420
421
        /** @var RetryableJob $job */
422
        $job = new $className();
423
424
        Util::copy($jobArchive, $job);
425
        $job->setStatus(BaseJob::STATUS_NEW);
426
        $job->setLocked(null);
427
        $job->setLockedAt(null);
428
        $job->setMessage(null);
429
        $job->setFinishedAt(null);
430
        $job->setStartedAt(null);
431
        $job->setElapsed(null);
432
        $job->setRetries($job->getRetries() + 1);
433
        $objectManager->persist($job);
434
        $objectManager->remove($jobArchive);
435
        $this->jobTiminigManager->recordTiming(JobTiming::STATUS_INSERT);
436
        ++$countProcessed;
437
    }
438
439
    abstract public function getWorkersAndMethods();
440
441
    abstract public function countLiveJobs($workerName = null, $methodName = null);
442
443
    abstract public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback);
444
}
445