Completed
Push — master ( a454e0...d0194b )
by Matthew
07:25
created

BaseJobManager::countJobsByStatus()

Size

Total Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 1
ccs 0
cts 0
cp 0
c 0
b 0
f 0
nc 1
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Model\PriorityJobManager;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Model\Run;
14
use Dtc\QueueBundle\Util\Util;
15
16
abstract class BaseJobManager extends PriorityJobManager
17
{
18
    /** Number of jobs to prune / reset / gather at a time */
19
    const FETCH_COUNT = 100;
20
21
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
22
    const STALLED_SECONDS = 1800;
23
    protected $objectManager;
24
    protected $objectName;
25
    protected $archiveObjectName;
26
    protected $runClass;
27
    protected $runArchiveClass;
28
29
    /**
30
     * @param string $objectName
31
     * @param string $archiveObjectName
32
     * @param string $runClass
33
     * @param string $runArchiveClass
34
     */
35 11
    public function __construct(ObjectManager $objectManager,
36
        $objectName,
37
        $archiveObjectName,
38
        $runClass,
39
        $runArchiveClass)
40
    {
41 11
        $this->objectManager = $objectManager;
42 11
        $this->objectName = $objectName;
43 11
        $this->archiveObjectName = $archiveObjectName;
44 11
        $this->runClass = $runClass;
45 11
        $this->runArchiveClass = $runArchiveClass;
46 11
    }
47
48
    /**
49
     * @return ObjectManager
50
     */
51 36
    public function getObjectManager()
52
    {
53 36
        return $this->objectManager;
54
    }
55
56
    /**
57
     * @return string
58
     */
59 34
    public function getObjectName()
60
    {
61 34
        return $this->objectName;
62
    }
63
64
    /**
65
     * @return string
66
     */
67 27
    public function getArchiveObjectName()
68
    {
69 27
        return $this->archiveObjectName;
70
    }
71
72
    /**
73
     * @return string
74
     */
75 4
    public function getRunClass()
76
    {
77 4
        return $this->runClass;
78
    }
79
80
    /**
81
     * @return string
82
     */
83 5
    public function getRunArchiveClass()
84
    {
85 5
        return $this->runArchiveClass;
86
    }
87
88
    /**
89
     * @return ObjectRepository
90
     */
91 27
    public function getRepository()
92
    {
93 27
        return $this->getObjectManager()->getRepository($this->getObjectName());
94
    }
95
96
    /**
97
     * @param string $objectName
98
     */
99
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
100
101 2
    public function resetErroneousJobs($workerName = null, $method = null)
102
    {
103 2
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
104
105 2
        $criterion = ['status' => Job::STATUS_ERROR];
106 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
107
108 2
        $countProcessed = 0;
109 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
110 2
            $countProcessed += $this->resetJobsByCriterion(
111 2
                $criterion, static::FETCH_COUNT, $i);
112
        }
113
114 2
        return $countProcessed;
115
    }
116
117
    /**
118
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
119
     *
120
     * @param null $workerName
121
     * @param null $method
122
     *
123
     * @return mixed
124
     */
125
    abstract protected function updateExpired($workerName = null, $method = null);
126
127 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
128
    {
129 9
        if (null !== $workerName) {
130 4
            $criterion['workerName'] = $workerName;
131
        }
132 9
        if (null !== $method) {
133 4
            $criterion['method'] = $method;
134
        }
135 9
    }
136
137 3
    public function pruneExpiredJobs($workerName = null, $method = null)
138
    {
139 3
        $count = $this->updateExpired($workerName, $method);
140 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
141 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
142 3
        $objectManager = $this->getObjectManager();
143 3
        $repository = $this->getRepository();
144 3
        $finalCount = 0;
145 3
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
146 3
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
147 3
            if (!empty($expiredJobs)) {
148 3
                foreach ($expiredJobs as $expiredJob) {
149
                    /* @var Job $expiredJob */
150 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
151 3
                    $objectManager->remove($expiredJob);
152 3
                    ++$finalCount;
153
                }
154
            }
155 3
            $this->flush();
156
        }
157
158 3
        return $finalCount;
159
    }
160
161 7
    protected function flush()
162
    {
163 7
        $this->getObjectManager()->flush();
164 7
    }
165
166 4
    protected function getStalledJobs($workerName = null, $method = null)
167
    {
168 4
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
169
170 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
171 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
172
173 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
174
175 4
        return $this->extractStalledJobs($runningJobs);
176
    }
177
178 4
    protected function findRunningJobs($criterion, $count)
179
    {
180 4
        $repository = $this->getRepository();
181 4
        $runningJobsById = [];
182
183 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
184 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
185 4
            if (!empty($runningJobs)) {
186 4
                foreach ($runningJobs as $job) {
187
                    /** @var RetryableJob $job */
188 4
                    if (null !== $runId = $job->getRunId()) {
189 4
                        $runningJobsById[$runId][] = $job;
190
                    }
191
                }
192
            }
193
        }
194
195 4
        return $runningJobsById;
196
    }
197
198 4
    protected function extractStalledJobs(array $runningJobsById)
199
    {
200 4
        $objectManager = $this->getObjectManager();
201 4
        $runRepository = $objectManager->getRepository($this->runClass);
202
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
203 4
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
204
205 4
        $stalledJobs = [];
206 4
        foreach (array_keys($runningJobsById) as $runId) {
207 4
            if ($runRepository->find($runId)) {
208
                continue;
209
            }
210
            /** @var Run $run */
211 4
            if ($run = $runArchiveRepository->find($runId)) {
212 4
                if ($endTime = $run->getEndedAt()) {
213
                    // Did it end over an hour ago
214 4
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
215 4
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
216
                    }
217
                }
218
            }
219
        }
220
221 4
        return $stalledJobs;
222
    }
223
224 6
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
225
    {
226 6
        if (null !== $max && $count >= $max) {
227
            $job->setStatus($status);
228
229
            return true;
230
        }
231
232 6
        return false;
233
    }
234
235 2
    public function resetStalledJobs($workerName = null, $method = null)
236
    {
237 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
238
239 2
        $objectManager = $this->getObjectManager();
240
241 2
        $countProcessed = 0;
242 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
243 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
244 2
                $job = $stalledJobs[$j];
245
                /* RetryableJob $job */
246 2
                $job->setStalledCount($job->getStalledCount() + 1);
247 2
                if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount())) {
248
                    $objectManager->remove($job);
249
                    continue;
250 2
                } elseif ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
251
                    $objectManager->remove($job);
252
                    continue;
253
                }
254
255 2
                $job->setRetries($job->getRetries() + 1);
256 2
                $job->setStatus(BaseJob::STATUS_NEW);
257 2
                $job->setLocked(null);
258 2
                $job->setLockedAt(null);
259 2
                $objectManager->persist($job);
260 2
                ++$countProcessed;
261
            }
262 2
            $this->flush();
263
        }
264
265 2
        return $countProcessed;
266
    }
267
268
    /**
269
     * @param string $workerName
270
     * @param string $method
271
     */
272 2
    public function pruneStalledJobs($workerName = null, $method = null)
273
    {
274 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
275 2
        $objectManager = $this->getObjectManager();
276
277 2
        $countProcessed = 0;
278 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
279 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
280
                /** @var RetryableJob $job */
281 2
                $job = $stalledJobs[$j];
282 2
                $job->setStalledCount($job->getStalledCount() + 1);
283 2
                $job->setStatus(BaseJob::STATUS_ERROR);
284 2
                $job->setMessage('stalled');
285 2
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
286 2
                $objectManager->remove($job);
287 2
                ++$countProcessed;
288
            }
289 2
            $this->flush();
290
        }
291
292 2
        return $countProcessed;
293
    }
294
295 10
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
296
    {
297 10
        $objectManager = $this->getObjectManager();
298 10
        $objectManager->remove($job);
299 10
        $objectManager->flush();
300 10
    }
301
302 2
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
303
    {
304 2
        $this->deleteJob($job); // Should cause job to be archived
305 2
    }
306
307 31
    protected function prioritySave(\Dtc\QueueBundle\Model\Job $job)
308
    {
309
        // Generate crc hash for the job
310 31
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
311 31
        $crcHash = hash('sha256', serialize($hashValues));
312 31
        $job->setCrcHash($crcHash);
313 31
        $objectManager = $this->getObjectManager();
314
315 31
        if (true === $job->getBatch()) {
316 2
            $oldJob = $this->updateNearestBatch($job);
317 2
            if ($oldJob) {
318 2
                return $oldJob;
319
            }
320
        }
321
322
        // Just save a new job
323 31
        $this->resetSaveOk(__FUNCTION__);
324 31
        $objectManager->persist($job);
325 31
        $objectManager->flush();
326
327 31
        return $job;
328
    }
329
330
    abstract protected function updateNearestBatch(Job $job);
331
332
    /**
333
     * @param string $objectName
334
     */
335
    abstract protected function stopIdGenerator($objectName);
336
337
    abstract protected function restoreIdGenerator($objectName);
338
339
    /**
340
     * @param array $criterion
341
     * @param int   $limit
342
     * @param int   $offset
343
     */
344 2
    private function resetJobsByCriterion(
345
        array $criterion,
346
        $limit,
347
        $offset)
348
    {
349 2
        $objectManager = $this->getObjectManager();
350 2
        $this->resetSaveOk(__FUNCTION__);
351 2
        $objectName = $this->getObjectName();
352 2
        $archiveObjectName = $this->getArchiveObjectName();
353 2
        $jobRepository = $objectManager->getRepository($objectName);
354 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
355 2
        $className = $jobRepository->getClassName();
356 2
        $metadata = $objectManager->getClassMetadata($className);
357 2
        $this->stopIdGenerator($objectName);
358 2
        $identifierData = $metadata->getIdentifier();
359 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
360 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
361 2
        $countProcessed = 0;
362
363 2
        foreach ($results as $jobArchive) {
364 2
            $this->resetJob($jobArchive, $className, $countProcessed);
365
        }
366 2
        $objectManager->flush();
367
368 2
        $this->restoreIdGenerator($objectName);
369
370 2
        return $countProcessed;
371
    }
372
373 17
    protected function resetSaveOk($function)
374
    {
375 17
    }
376
377
    /**
378
     * @param RetryableJob $jobArchive
379
     * @param $className
380
     * @param $countProcessed
381
     */
382 2
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
383
    {
384 2
        $objectManager = $this->getObjectManager();
385 2
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
386
            $objectManager->persist($jobArchive);
387
388
            return;
389
        }
390
391
        /** @var RetryableJob $job */
392 2
        $job = new $className();
393
394 2
        Util::copy($jobArchive, $job);
395 2
        $job->setStatus(BaseJob::STATUS_NEW);
396 2
        $job->setLocked(null);
397 2
        $job->setLockedAt(null);
398 2
        $job->setMessage(null);
399 2
        $job->setFinishedAt(null);
400 2
        $job->setStartedAt(null);
401 2
        $job->setElapsed(null);
402 2
        $job->setRetries($job->getRetries() + 1);
403
404 2
        $objectManager->persist($job);
405 2
        $objectManager->remove($jobArchive);
406 2
        ++$countProcessed;
407 2
    }
408
}
409