Completed
Push — master ( 58e792...f01f49 )
by Matthew
05:34
created

BaseJobManager::updateExpired()

Size

Total Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 1
ccs 0
cts 0
cp 0
c 0
b 0
f 0
nc 1
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityManager;
9
use Doctrine\ORM\EntityRepository;
10
use Dtc\QueueBundle\Model\AbstractJobManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\Job;
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Model\Run;
15
use Dtc\QueueBundle\Util\Util;
16
17
abstract class BaseJobManager extends AbstractJobManager
18
{
19
    /** Number of jobs to prune / reset / gather at a time */
20
    const FETCH_COUNT = 100;
21
22
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
23
    const STALLED_SECONDS = 1800;
24
    protected $objectManager;
25
    protected $objectName;
26
    protected $archiveObjectName;
27
    protected $runClass;
28
    protected $runArchiveClass;
29
30
    /**
31
     * @param string $objectName
32
     * @param string $archiveObjectName
33
     * @param string $runClass
34
     * @param string $runArchiveClass
35
     */
36 3
    public function __construct(ObjectManager $objectManager,
37
        $objectName,
38
        $archiveObjectName,
39
        $runClass,
40
        $runArchiveClass)
41
    {
42 3
        $this->objectManager = $objectManager;
43 3
        $this->objectName = $objectName;
44 3
        $this->archiveObjectName = $archiveObjectName;
45 3
        $this->runClass = $runClass;
46 3
        $this->runArchiveClass = $runArchiveClass;
47 3
    }
48
49
    /**
50
     * @return ObjectManager
51
     */
52 22
    public function getObjectManager()
53
    {
54 22
        return $this->objectManager;
55
    }
56
57
    /**
58
     * @return string
59
     */
60 20
    public function getObjectName()
61
    {
62 20
        return $this->objectName;
63
    }
64
65
    /**
66
     * @return string
67
     */
68 16
    public function getArchiveObjectName()
69
    {
70 16
        return $this->archiveObjectName;
71
    }
72
73
    /**
74
     * @return string
75
     */
76 4
    public function getRunClass()
77
    {
78 4
        return $this->runClass;
79
    }
80
81
    /**
82
     * @return string
83
     */
84 4
    public function getRunArchiveClass()
85
    {
86 4
        return $this->runArchiveClass;
87
    }
88
89
    /**
90
     * @return ObjectRepository
91
     */
92 16
    public function getRepository()
93
    {
94 16
        return $this->getObjectManager()->getRepository($this->getObjectName());
95
    }
96
97
    /**
98
     * @param string $objectName
99
     */
100
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
101
102 2
    public function resetErroneousJobs($workerName = null, $method = null)
103
    {
104 2
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
105
106 2
        $criterion = ['status' => Job::STATUS_ERROR];
107 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
108
109 2
        $countProcessed = 0;
110 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
111 2
            $countProcessed += $this->resetJobsByCriterion(
112 2
                $criterion, static::FETCH_COUNT, $i);
113
        }
114
115 2
        return $countProcessed;
116
    }
117
118
    /**
119
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
120
     *
121
     * @param null $workerName
122
     * @param null $method
123
     *
124
     * @return mixed
125
     */
126
    abstract protected function updateExpired($workerName = null, $method = null);
127
128 8
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
129
    {
130 8
        if (null !== $workerName) {
131 4
            $criterion['workerName'] = $workerName;
132
        }
133 8
        if (null !== $method) {
134 4
            $criterion['method'] = $method;
135
        }
136 8
    }
137
138 2
    public function pruneExpiredJobs($workerName = null, $method = null)
139
    {
140 2
        $count = $this->updateExpired($workerName, $method);
141 2
        $criterion = ['status' => Job::STATUS_EXPIRED];
142 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
143 2
        $objectManager = $this->getObjectManager();
144 2
        $repository = $this->getRepository();
145 2
        $finalCount = 0;
146 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
147 2
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
148 2
            if (!empty($expiredJobs)) {
149 2
                foreach ($expiredJobs as $expiredJob) {
150
                    /* @var Job $expiredJob */
151 2
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
152 2
                    $objectManager->remove($expiredJob);
153 2
                    ++$finalCount;
154
                }
155
            }
156 2
            $objectManager->flush();
157
        }
158
159 2
        return $finalCount;
160
    }
161
162 4
    protected function getStalledJobs($workerName = null, $method = null)
163
    {
164 4
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
165
166 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
167 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
168
169 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
170
171 4
        return $this->extractStalledJobs($runningJobs);
172
    }
173
174 4
    protected function findRunningJobs($criterion, $count)
175
    {
176 4
        $repository = $this->getRepository();
177 4
        $runningJobsById = [];
178
179 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
180 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
181 4
            if (!empty($runningJobs)) {
182 4
                foreach ($runningJobs as $job) {
183
                    /** @var RetryableJob $job */
184 4
                    if (null !== $runId = $job->getRunId()) {
185 4
                        $runningJobsById[$runId][] = $job;
186
                    }
187
                }
188
            }
189
        }
190
191 4
        return $runningJobsById;
192
    }
193
194 4
    protected function extractStalledJobs(array $runningJobsById)
195
    {
196 4
        $objectManager = $this->getObjectManager();
197 4
        $runRepository = $objectManager->getRepository($this->runClass);
198
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
199 4
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
200
201 4
        $stalledJobs = [];
202 4
        foreach (array_keys($runningJobsById) as $runId) {
203 4
            if ($runRepository->find($runId)) {
204
                continue;
205
            }
206
            /** @var Run $run */
207 4
            if ($run = $runArchiveRepository->find($runId)) {
208 4
                if ($endTime = $run->getEndedAt()) {
209
                    // Did it end over an hour ago
210 4
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
211 4
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
212
                    }
213
                }
214
            }
215
        }
216
217 4
        return $stalledJobs;
218
    }
219
220 6
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0) {
221 6
        if (null !== $max && $count >= $max) {
222
            $job->setStatus($status);
223
            return true;
224
        }
225 6
        return false;
226
    }
227
228 2
    public function resetStalledJobs($workerName = null, $method = null)
229
    {
230 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
231
232 2
        $objectManager = $this->getObjectManager();
233
234 2
        $countProcessed = 0;
235 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
236 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
237 2
                $job = $stalledJobs[$j];
238
                /* RetryableJob $job */
239 2
                $job->setStalledCount($job->getStalledCount() + 1);
240 2
                if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount())) {
241
                    $objectManager->remove($job);
242
                    continue;
243
                }
244 2
                else if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
245
                    $objectManager->remove($job);
246
                    continue;
247
                }
248
249 2
                $job->setRetries($job->getRetries() + 1);
250 2
                $job->setStatus(BaseJob::STATUS_NEW);
251 2
                $job->setLocked(null);
252 2
                $job->setLockedAt(null);
253 2
                $objectManager->persist($job);
254 2
                ++$countProcessed;
255
            }
256 2
            $objectManager->flush();
257
        }
258
259 2
        return $countProcessed;
260
    }
261
262
    /**
263
     * @param string $workerName
264
     * @param string $method
265
     */
266 2
    public function pruneStalledJobs($workerName = null, $method = null)
267
    {
268 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
269 2
        $objectManager = $this->getObjectManager();
270
271 2
        $countProcessed = 0;
272 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
273 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
274
                /** @var RetryableJob $job */
275 2
                $job = $stalledJobs[$j];
276 2
                $job->setStalledCount($job->getStalledCount() + 1);
277 2
                $job->setStatus(BaseJob::STATUS_ERROR);
278 2
                $job->setMessage('stalled');
279 2
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
280 2
                $objectManager->remove($job);
281 2
                ++$countProcessed;
282
            }
283 2
            $objectManager->flush();
284
        }
285
286 2
        return $countProcessed;
287
    }
288
289 8
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
290
    {
291 8
        $objectManager = $this->getObjectManager();
292 8
        $objectManager->remove($job);
293 8
        $objectManager->flush();
294 8
    }
295
296
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
297
    {
298
        $this->deleteJob($job); // Should cause job to be archived
299
    }
300
301 22
    public function save(\Dtc\QueueBundle\Model\Job $job)
302
    {
303
        // Todo: Serialize args
304
305
        // Generate crc hash for the job
306 22
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
307 22
        $crcHash = hash('sha256', serialize($hashValues));
308 22
        $job->setCrcHash($crcHash);
309 22
        $objectManager = $this->getObjectManager();
310
311 22
        if (true === $job->getBatch()) {
312
            // See if similar job that hasn't run exists
313
            $criteria = array('crcHash' => $crcHash, 'status' => BaseJob::STATUS_NEW);
314
            $oldJob = $this->getRepository()->findOneBy($criteria);
315
316
            if ($oldJob) {
317
                // Old job exists - just override fields Set higher priority
318
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
319
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
320
                $oldJob->setBatch(true);
321
                $objectManager->persist($oldJob);
322
                $objectManager->flush();
323
324
                return $oldJob;
325
            }
326
        }
327
328
        // Just save a new job
329 22
        $this->resetSaveOk(__FUNCTION__);
330 22
        $objectManager->persist($job);
331 22
        $objectManager->flush();
332
333 22
        return $job;
334
    }
335
336
    /**
337
     * @param string $objectName
338
     */
339
    abstract protected function stopIdGenerator($objectName);
340
341
    abstract protected function restoreIdGenerator($objectName);
342
343
    /**
344
     * @param array $criterion
345
     * @param int $limit
346
     * @param int $offset
347
     */
348 2
    private function resetJobsByCriterion(
349
        array $criterion,
350
        $limit,
351
        $offset)
352
    {
353 2
        $objectManager = $this->getObjectManager();
354 2
        $this->resetSaveOk(__FUNCTION__);
355 2
        $objectName = $this->getObjectName();
356 2
        $archiveObjectName = $this->getArchiveObjectName();
357 2
        $jobRepository = $objectManager->getRepository($objectName);
358 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
359 2
        $className = $jobRepository->getClassName();
360 2
        $metadata = $objectManager->getClassMetadata($className);
361 2
        $this->stopIdGenerator($objectName);
362 2
        $identifierData = $metadata->getIdentifier();
363 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
364 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
365 2
        $countProcessed = 0;
366
367 2
        foreach ($results as $jobArchive) {
368 2
            $this->resetJob($jobArchive, $className, $countProcessed);
369
        }
370 2
        $objectManager->flush();
371
372 2
        $this->restoreIdGenerator($objectName);
373
374 2
        return $countProcessed;
375
    }
376
377
    protected function resetSaveOk($function) {}
378
379
    /**
380
     * @param RetryableJob $jobArchive
381
     * @param $className
382
     * @param $countProcessed
383
     */
384 2
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed) {
385 2
        $objectManager = $this->getObjectManager();
386 2
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
387
            $objectManager->persist($jobArchive);
388
            return;
389
        }
390
391
        /** @var RetryableJob $job */
392 2
        $job = new $className();
393
394 2
        Util::copy($jobArchive, $job);
395 2
        $job->setStatus(BaseJob::STATUS_NEW);
396 2
        $job->setLocked(null);
397 2
        $job->setLockedAt(null);
398 2
        $job->setMessage(null);
399 2
        $job->setFinishedAt(null);
400 2
        $job->setStartedAt(null);
401 2
        $job->setElapsed(null);
402 2
        $job->setRetries($job->getRetries() + 1);
403
404 2
        $objectManager->persist($job);
405 2
        $objectManager->remove($jobArchive);
406 2
        ++$countProcessed;
407 2
    }
408
}
409