Completed
Pull Request — master (#30)
by Matthew
23:29 queued 08:10
created

DoctrineJobManager::addWorkerNameMethod()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 6
cts 6
cp 1
rs 9.6666
c 0
b 0
f 0
cc 3
eloc 5
nc 4
nop 3
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\ODM\MongoDB\DocumentRepository;
6
use Doctrine\ORM\EntityRepository;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\StallableJob;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
15
abstract class DoctrineJobManager extends BaseDoctrineJobManager
16
{
17
    /** Number of jobs to prune / reset / gather at a time */
18
    const FETCH_COUNT = 100;
19
20
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
21
    const STALLED_SECONDS = 1800;
22
23
    /**
24
     * @param string $objectName
25
     */
26
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
27
28 2
    public function resetExceptionJobs($workerName = null, $method = null)
29
    {
30 2
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
31
32 2
        $criterion = ['status' => Job::STATUS_EXCEPTION];
33 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
34
35 2
        $countProcessed = 0;
36 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
37 2
            $countProcessed += $this->resetJobsByCriterion(
38 2
                $criterion,
39 2
                static::FETCH_COUNT,
40 2
                $i
41
            );
42
        }
43
44 2
        return $countProcessed;
45
    }
46
47
    /**
48
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
49
     *
50
     * @param null $workerName
51
     * @param null $method
52
     *
53
     * @return mixed
54
     */
55
    abstract protected function updateExpired($workerName = null, $method = null);
56
57 9
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
58
    {
59 9
        if (null !== $workerName) {
60 4
            $criterion['workerName'] = $workerName;
61
        }
62 9
        if (null !== $method) {
63 4
            $criterion['method'] = $method;
64
        }
65 9
    }
66
67 3
    public function pruneExpiredJobs($workerName = null, $method = null)
68
    {
69 3
        $count = $this->updateExpired($workerName, $method);
70 3
        $criterion = ['status' => Job::STATUS_EXPIRED];
71 3
        $this->addWorkerNameMethod($criterion, $workerName, $method);
72 3
        $objectManager = $this->getObjectManager();
73 3
        $repository = $this->getRepository();
74 3
        $finalCount = 0;
75 3
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
76 3
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
77 3
            $innerCount = 0;
78 3
            if (!empty($expiredJobs)) {
79 3
                foreach ($expiredJobs as $expiredJob) {
80
                    /* @var Job $expiredJob */
81 3
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
82 3
                    $objectManager->remove($expiredJob);
83 3
                    ++$finalCount;
84 3
                    ++$innerCount;
85
                }
86
            }
87 3
            $this->flush();
88 3
            for ($j = 0; $j < $innerCount; ++$j) {
89 3
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
90
            }
91
        }
92
93 3
        return $finalCount;
94
    }
95
96 4
    protected function getStalledJobs($workerName = null, $method = null)
97
    {
98 4
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
99
100 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
101 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
102
103 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
104
105 4
        return $this->extractStalledJobs($runningJobs);
106
    }
107
108 4
    protected function findRunningJobs($criterion, $count)
109
    {
110 4
        $repository = $this->getRepository();
111 4
        $runningJobsById = [];
112
113 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
114 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
115 4
            if (!empty($runningJobs)) {
116 4
                foreach ($runningJobs as $job) {
117
                    /** @var StallableJob $job */
118 4
                    if (null !== $runId = $job->getRunId()) {
119 4
                        $runningJobsById[$runId][] = $job;
120
                    }
121
                }
122
            }
123
        }
124
125 4
        return $runningJobsById;
126
    }
127
128
    /**
129
     * @param $runId
130
     * @param array $jobs
131
     * @param array $stalledJobs
132
     */
133 4
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
134
    {
135 4
        $objectManager = $this->getObjectManager();
136 4
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
137 4
        if ($run = $runRepository->find($runId)) {
138 2
            foreach ($jobs as $job) {
139 2
                if ($run->getCurrentJobId() == $job->getId()) {
140 2
                    continue;
141
                }
142 2
                $stalledJobs[] = $job;
143
            }
144
        }
145 4
    }
146
147
    /**
148
     * @param array $runningJobsById
149
     *
150
     * @return array
151
     */
152 4
    protected function extractStalledJobs(array $runningJobsById)
153
    {
154 4
        $stalledJobs = [];
155 4
        foreach (array_keys($runningJobsById) as $runId) {
156 4
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
157 4
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
158
        }
159
160 4
        return $stalledJobs;
161
    }
162
163 4
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
164
    {
165 4
        $runManager = $this->getRunManager();
166 4
        if (!method_exists($runManager, 'getObjectManager')) {
167
            return;
168
        }
169 4
        if (!method_exists($runManager, 'getRunArchiveClass')) {
170
            return;
171
        }
172
173
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
174 4
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
175
        /** @var Run $run */
176 4
        if ($run = $runArchiveRepository->find($runId)) {
177 4
            if ($endTime = $run->getEndedAt()) {
178
                // Did it end over an hour ago
179 4
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
180 4
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
181
                }
182
            }
183
        }
184 4
    }
185
186 2
    protected function runStalledLoop($i, $count, array $stalledJobs)
187
    {
188 2
        $resetCount = 0;
189 2
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
190
            /* StallableJob $job */
191 2
            $job = $stalledJobs[$j];
192 2
            $job->setStatus(StallableJob::STATUS_STALLED);
193 2
            if ($this->saveHistory($job)) {
194 2
                ++$resetCount;
195
            }
196
        }
197
198 2
        return $resetCount;
199
    }
200
201 2
    public function resetStalledJobs($workerName = null, $method = null)
202
    {
203 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
204
205 2
        $countReset = 0;
206 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
207 2
            $resetCount = $this->runStalledLoop($i, $count, $stalledJobs);
208 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
209 2
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
210
            }
211 2
            $countReset += $resetCount;
212 2
            $this->flush();
213
        }
214
215 2
        return $countReset;
216
    }
217
218
    /**
219
     * @param string $workerName
220
     * @param string $method
221
     */
222 2
    public function pruneStalledJobs($workerName = null, $method = null)
223
    {
224 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
225
226 2
        $countProcessed = 0;
227 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
228 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
229
                /** @var StallableJob $job */
230 2
                $job = $stalledJobs[$j];
231 2
                $job->setStatus(StallableJob::STATUS_STALLED);
232 2
                $job->setStalls(intval($job->getStalls()) + 1);
233 2
                $this->deleteJob($job);
234 2
                ++$countProcessed;
235
            }
236 2
            $this->flush();
237
        }
238
239 2
        return $countProcessed;
240
    }
241
242 8
    protected function stallableSaveHistory(StallableJob $job, $retry)
243
    {
244 8
        if (!$retry) {
245 8
            $this->deleteJob($job);
246
        }
247
248 8
        return $retry;
249
    }
250
251 43
    protected function stallableSave(StallableJob $job)
252
    {
253
        // Generate crc hash for the job
254 43
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
255 43
        $crcHash = hash('sha256', serialize($hashValues));
256 43
        $job->setCrcHash($crcHash);
257 43
        $objectManager = $this->getObjectManager();
258
259 43
        if (true === $job->getBatch()) {
260 2
            $oldJob = $this->updateNearestBatch($job);
261 2
            if ($oldJob) {
262 2
                return $oldJob;
263
            }
264
        }
265
266
        // Just save a new job
267 43
        $this->resetSaveOk(__FUNCTION__);
268 43
        $objectManager->persist($job);
269 43
        $objectManager->flush();
270
271 43
        return $job;
272
    }
273
274
    abstract protected function updateNearestBatch(Job $job);
275
276
    /**
277
     * @param string $objectName
278
     */
279
    abstract protected function stopIdGenerator($objectName);
280
281
    abstract protected function restoreIdGenerator($objectName);
282
283
    /**
284
     * @param array $criterion
285
     * @param int   $limit
286
     * @param int   $offset
287
     */
288 2
    private function resetJobsByCriterion(
289
        array $criterion,
290
        $limit,
291
        $offset
292
    ) {
293 2
        $objectManager = $this->getObjectManager();
294 2
        $this->resetSaveOk(__FUNCTION__);
295 2
        $objectName = $this->getJobClass();
296 2
        $archiveObjectName = $this->getJobArchiveClass();
297 2
        $jobRepository = $objectManager->getRepository($objectName);
298 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
299 2
        $className = $jobRepository->getClassName();
300 2
        $metadata = $objectManager->getClassMetadata($className);
301 2
        $this->stopIdGenerator($objectName);
302 2
        $identifierData = $metadata->getIdentifier();
303 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
304 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
305 2
        $countProcessed = 0;
306
307 2
        foreach ($results as $jobArchive) {
308 2
            $countProcessed += $this->resetArchiveJob($jobArchive);
309
        }
310 2
        $objectManager->flush();
311
312 2
        $this->restoreIdGenerator($objectName);
313
314 2
        return $countProcessed;
315
    }
316
317 22
    protected function resetSaveOk($function)
318
    {
319 22
    }
320
321
    /**
322
     * @param null $workerName
323
     * @param null $methodName
324
     * @param bool $prioritize
325
     */
326
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
327
328
    /**
329
     * @param StallableJob $jobArchive
330
     * @param $className
331
     *
332
     * @return int Number of jobs reset
333
     */
334 2
    protected function resetArchiveJob(StallableJob $jobArchive)
335
    {
336 2
        $objectManager = $this->getObjectManager();
337 2
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
338 2
            $objectManager->persist($jobArchive);
339
340 2
            return 0;
341
        }
342
343
        /** @var StallableJob $job */
344 2
        $className = $this->getJobClass();
345 2
        $newJob = new $className();
346 2
        Util::copy($jobArchive, $newJob);
347 2
        $this->resetJob($newJob);
348 2
        $objectManager->remove($jobArchive);
349 2
        $this->flush();
350
351 2
        return 1;
352
    }
353
354 8
    protected function resetJob(RetryableJob $job)
355
    {
356 8
        if (!$job instanceof StallableJob) {
357
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
358
        }
359 8
        $job->setStatus(BaseJob::STATUS_NEW);
360 8
        $job->setMessage(null);
361 8
        $job->setFinishedAt(null);
362 8
        $job->setStartedAt(null);
363 8
        $job->setElapsed(null);
364 8
        $job->setRetries($job->getRetries() + 1);
365 8
        $this->getObjectManager()->persist($job);
366 8
        $this->flush();
367
368 8
        return true;
369
    }
370
371
    abstract public function getWorkersAndMethods();
372
373
    abstract public function countLiveJobs($workerName = null, $methodName = null);
374
375
    abstract public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback);
376
}
377