Completed
Push — master ( ce55af...9dbaff )
by Matthew
04:38
created

BaseJobManager::addWorkerNameMethod()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 6
cts 6
cp 1
rs 9.6666
c 0
b 0
f 0
cc 3
eloc 5
nc 4
nop 3
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Model\PriorityJobManager;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Model\Run;
14
use Dtc\QueueBundle\Util\Util;
15
16
abstract class BaseJobManager extends PriorityJobManager
17
{
18
    /** Number of jobs to prune / reset / gather at a time */
19
    const FETCH_COUNT = 100;
20
21
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
22
    const STALLED_SECONDS = 1800;
23
    protected $objectManager;
24
    protected $objectName;
25
    protected $archiveObjectName;
26
    protected $runClass;
27
    protected $runArchiveClass;
28
29
    /**
30
     * @param string $objectName
31
     * @param string $archiveObjectName
32
     * @param string $runClass
33
     * @param string $runArchiveClass
34
     */
35 11
    public function __construct(ObjectManager $objectManager,
36
        $objectName,
37
        $archiveObjectName,
38
        $runClass,
39
        $runArchiveClass)
40
    {
41 11
        $this->objectManager = $objectManager;
42 11
        $this->objectName = $objectName;
43 11
        $this->archiveObjectName = $archiveObjectName;
44 11
        $this->runClass = $runClass;
45 11
        $this->runArchiveClass = $runArchiveClass;
46 11
    }
47
48
    /**
49
     * @return ObjectManager
50
     */
51 38
    public function getObjectManager()
52
    {
53 38
        return $this->objectManager;
54
    }
55
56
    /**
57
     * @param ObjectManager $objectManager
58
     */
59
    public function setObjectManager(ObjectManager $objectManager)
60
    {
61
        $this->objectManager = $objectManager;
62
    }
63
64
    /**
65
     * @return string
66
     */
67 36
    public function getObjectName()
68
    {
69 36
        return $this->objectName;
70
    }
71
72
    /**
73
     * @return string
74
     */
75 24
    public function getArchiveObjectName()
76
    {
77 24
        return $this->archiveObjectName;
78
    }
79
80
    /**
81
     * @return string
82
     */
83 4
    public function getRunClass()
84
    {
85 4
        return $this->runClass;
86
    }
87
88
    /**
89
     * @return string
90
     */
91 5
    public function getRunArchiveClass()
92
    {
93 5
        return $this->runArchiveClass;
94
    }
95
96
    /**
97
     * @return ObjectRepository
98
     */
99 28
    public function getRepository()
100
    {
101 28
        return $this->getObjectManager()->getRepository($this->getObjectName());
102
    }
103
104
    /**
105
     * @param string $objectName
106
     */
107
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
108
109 2
    public function resetErroneousJobs($workerName = null, $method = null)
110
    {
111 2
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
112
113 2
        $criterion = ['status' => Job::STATUS_ERROR];
114 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
115
116 2
        $countProcessed = 0;
117 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
118 2
            $countProcessed += $this->resetJobsByCriterion(
119 2
                $criterion, static::FETCH_COUNT, $i);
120
        }
121
122 2
        return $countProcessed;
123
    }
124
125
    /**
126
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
127
     *
128
     * @param null $workerName
129
     * @param null $method
130
     *
131
     * @return mixed
132
     */
133
    abstract protected function updateExpired($workerName = null, $method = null);
134
135 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
136
    {
137 9
        if (null !== $workerName) {
138 4
            $criterion['workerName'] = $workerName;
139
        }
140 9
        if (null !== $method) {
141 4
            $criterion['method'] = $method;
142
        }
143 9
    }
144
145 3
    public function pruneExpiredJobs($workerName = null, $method = null)
146
    {
147 3
        $count = $this->updateExpired($workerName, $method);
148 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
149 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
150 3
        $objectManager = $this->getObjectManager();
151 3
        $repository = $this->getRepository();
152 3
        $finalCount = 0;
153 3
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
154 3
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
155 3
            if (!empty($expiredJobs)) {
156 3
                foreach ($expiredJobs as $expiredJob) {
157
                    /* @var Job $expiredJob */
158 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
159 3
                    $objectManager->remove($expiredJob);
160 3
                    ++$finalCount;
161
                }
162
            }
163 3
            $this->flush();
164
        }
165
166 3
        return $finalCount;
167
    }
168
169 7
    protected function flush()
170
    {
171 7
        $this->getObjectManager()->flush();
172 7
    }
173
174 4
    protected function getStalledJobs($workerName = null, $method = null)
175
    {
176 4
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
177
178 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
179 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
180
181 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
182
183 4
        return $this->extractStalledJobs($runningJobs);
184
    }
185
186 4
    protected function findRunningJobs($criterion, $count)
187
    {
188 4
        $repository = $this->getRepository();
189 4
        $runningJobsById = [];
190
191 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
192 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
193 4
            if (!empty($runningJobs)) {
194 4
                foreach ($runningJobs as $job) {
195
                    /** @var RetryableJob $job */
196 4
                    if (null !== $runId = $job->getRunId()) {
197 4
                        $runningJobsById[$runId][] = $job;
198
                    }
199
                }
200
            }
201
        }
202
203 4
        return $runningJobsById;
204
    }
205
206
    /**
207
     * @param $runId
208
     * @param array $jobs
209
     * @param array $stalledJobs
210
     */
211 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
212
    {
213 4
        $objectManager = $this->getObjectManager();
214 4
        $runRepository = $objectManager->getRepository($this->runClass);
215 4
        if ($run = $runRepository->find($runId)) {
216 2
            foreach ($jobs as $job) {
217 2
                if ($run->getCurrentJobId() == $job->getId()) {
218 2
                    continue;
219
                }
220 2
                $stalledJobs[] = $job;
221
            }
222
        }
223 4
    }
224
225
    /**
226
     * @param array $runningJobsById
227
     *
228
     * @return array
229
     */
230 4
    protected function extractStalledJobs(array $runningJobsById)
231
    {
232 4
        $objectManager = $this->getObjectManager();
233
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
234 4
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
235
236 4
        $stalledJobs = [];
237 4
        foreach (array_keys($runningJobsById) as $runId) {
238 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
239
            /** @var Run $run */
240 4
            if ($run = $runArchiveRepository->find($runId)) {
241 4
                if ($endTime = $run->getEndedAt()) {
242
                    // Did it end over an hour ago
243 4
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
244 4
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
245
                    }
246
                }
247
            }
248
        }
249
250 4
        return $stalledJobs;
251
    }
252
253 6
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
254
    {
255 6
        if (null !== $max && $count >= $max) {
256 4
            $job->setStatus($status);
257
258 4
            return true;
259
        }
260
261 6
        return false;
262
    }
263
264 2
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
265
    {
266 2
        $objectManager = $this->getObjectManager();
267 2
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
268
            /* RetryableJob $job */
269 2
            $job = $stalledJobs[$j];
270 2
            $job->setStalledCount($job->getStalledCount() + 1);
271 2
            if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount()) ||
272 2
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
273 2
                $objectManager->remove($job);
274 2
                continue;
275
            }
276
277 2
            $job->setRetries($job->getRetries() + 1);
278 2
            $job->setStatus(BaseJob::STATUS_NEW);
279 2
            $job->setLocked(null);
280 2
            $job->setLockedAt(null);
281 2
            $objectManager->persist($job);
282 2
            ++$countProcessed;
283
        }
284 2
    }
285
286 2
    public function resetStalledJobs($workerName = null, $method = null)
287
    {
288 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
289
290 2
        $countProcessed = 0;
291 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
292 2
            $this->runStalledLoop($i, $count, $stalledJobs, $countProcessed);
293 2
            $this->flush();
294
        }
295
296 2
        return $countProcessed;
297
    }
298
299
    /**
300
     * @param string $workerName
301
     * @param string $method
302
     */
303 2
    public function pruneStalledJobs($workerName = null, $method = null)
304
    {
305 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
306 2
        $objectManager = $this->getObjectManager();
307
308 2
        $countProcessed = 0;
309 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
310 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
311
                /** @var RetryableJob $job */
312 2
                $job = $stalledJobs[$j];
313 2
                $job->setStalledCount($job->getStalledCount() + 1);
314 2
                $job->setStatus(BaseJob::STATUS_ERROR);
315 2
                $job->setMessage('stalled');
316 2
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
317 2
                $objectManager->remove($job);
318 2
                ++$countProcessed;
319
            }
320 2
            $this->flush();
321
        }
322
323 2
        return $countProcessed;
324
    }
325
326 10
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
327
    {
328 10
        $objectManager = $this->getObjectManager();
329 10
        $objectManager->remove($job);
330 10
        $objectManager->flush();
331 10
    }
332
333 2
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
334
    {
335 2
        $this->deleteJob($job); // Should cause job to be archived
336 2
    }
337
338 33
    protected function prioritySave(\Dtc\QueueBundle\Model\Job $job)
339
    {
340
        // Generate crc hash for the job
341 33
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
342 33
        $crcHash = hash('sha256', serialize($hashValues));
343 33
        $job->setCrcHash($crcHash);
344 33
        $objectManager = $this->getObjectManager();
345
346 33
        if (true === $job->getBatch()) {
347 2
            $oldJob = $this->updateNearestBatch($job);
348 2
            if ($oldJob) {
349 2
                return $oldJob;
350
            }
351
        }
352
353
        // Just save a new job
354 33
        $this->resetSaveOk(__FUNCTION__);
355 33
        $objectManager->persist($job);
356 33
        $objectManager->flush();
357
358 33
        return $job;
359
    }
360
361
    abstract protected function updateNearestBatch(Job $job);
362
363
    /**
364
     * @param string $objectName
365
     */
366
    abstract protected function stopIdGenerator($objectName);
367
368
    abstract protected function restoreIdGenerator($objectName);
369
370
    /**
371
     * @param array $criterion
372
     * @param int   $limit
373
     * @param int   $offset
374
     */
375 2
    private function resetJobsByCriterion(
376
        array $criterion,
377
        $limit,
378
        $offset)
379
    {
380 2
        $objectManager = $this->getObjectManager();
381 2
        $this->resetSaveOk(__FUNCTION__);
382 2
        $objectName = $this->getObjectName();
383 2
        $archiveObjectName = $this->getArchiveObjectName();
384 2
        $jobRepository = $objectManager->getRepository($objectName);
385 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
386 2
        $className = $jobRepository->getClassName();
387 2
        $metadata = $objectManager->getClassMetadata($className);
388 2
        $this->stopIdGenerator($objectName);
389 2
        $identifierData = $metadata->getIdentifier();
390 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
391 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
392 2
        $countProcessed = 0;
393
394 2
        foreach ($results as $jobArchive) {
395 2
            $this->resetJob($jobArchive, $className, $countProcessed);
396
        }
397 2
        $objectManager->flush();
398
399 2
        $this->restoreIdGenerator($objectName);
400
401 2
        return $countProcessed;
402
    }
403
404 17
    protected function resetSaveOk($function)
405
    {
406 17
    }
407
408
    /**
409
     * @param null $workerName
410
     * @param null $methodName
411
     * @param bool $prioritize
412
     */
413
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
414
415
    /**
416
     * @param RetryableJob $jobArchive
417
     * @param $className
418
     * @param $countProcessed
419
     */
420 2
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
421
    {
422 2
        $objectManager = $this->getObjectManager();
423 2
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
424 2
            $objectManager->persist($jobArchive);
425
426 2
            return;
427
        }
428
429
        /** @var RetryableJob $job */
430 2
        $job = new $className();
431
432 2
        Util::copy($jobArchive, $job);
433 2
        $job->setStatus(BaseJob::STATUS_NEW);
434 2
        $job->setLocked(null);
435 2
        $job->setLockedAt(null);
436 2
        $job->setMessage(null);
437 2
        $job->setFinishedAt(null);
438 2
        $job->setStartedAt(null);
439 2
        $job->setElapsed(null);
440 2
        $job->setRetries($job->getRetries() + 1);
441
442 2
        $objectManager->persist($job);
443 2
        $objectManager->remove($jobArchive);
444 2
        ++$countProcessed;
445 2
    }
446
}
447