Completed
Push — master ( 203a9c...2f9634 )
by Matthew
05:23 queued 25s
created

BaseJobManager::pruneExpiredJobs()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 23
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 23
ccs 16
cts 16
cp 1
rs 8.7972
c 0
b 0
f 0
cc 4
eloc 16
nc 3
nop 2
crap 4
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityManager;
9
use Doctrine\ORM\EntityRepository;
10
use Dtc\QueueBundle\Model\AbstractJobManager;
11
use Dtc\QueueBundle\Model\BaseJob;
12
use Dtc\QueueBundle\Model\Job;
13
use Dtc\QueueBundle\Model\RetryableJob;
14
use Dtc\QueueBundle\Model\Run;
15
use Dtc\QueueBundle\Util\Util;
16
17
abstract class BaseJobManager extends AbstractJobManager
18
{
19
    /** Number of jobs to prune / reset / gather at a time */
20
    const FETCH_COUNT = 100;
21
22
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
23
    const STALLED_SECONDS = 1800;
24
    protected $objectManager;
25
    protected $objectName;
26
    protected $archiveObjectName;
27
    protected $runClass;
28
    protected $runArchiveClass;
29
    protected static $saveInsertCalled = null;
30
    protected static $resetInsertCalled = null;
31
32
    /**
33
     * @param string $objectName
34
     * @param string $archiveObjectName
35
     * @param string $runClass
36
     * @param string $runArchiveClass
37
     */
38 3
    public function __construct(ObjectManager $objectManager,
39
        $objectName,
40
        $archiveObjectName,
41
        $runClass,
42
        $runArchiveClass)
43
    {
44 3
        $this->objectManager = $objectManager;
45 3
        $this->objectName = $objectName;
46 3
        $this->archiveObjectName = $archiveObjectName;
47 3
        $this->runClass = $runClass;
48 3
        $this->runArchiveClass = $runArchiveClass;
49 3
    }
50
51
    /**
52
     * @return ObjectManager
53
     */
54 22
    public function getObjectManager()
55
    {
56 22
        return $this->objectManager;
57
    }
58
59
    /**
60
     * @return string
61
     */
62 20
    public function getObjectName()
63
    {
64 20
        return $this->objectName;
65
    }
66
67
    /**
68
     * @return string
69
     */
70 16
    public function getArchiveObjectName()
71
    {
72 16
        return $this->archiveObjectName;
73
    }
74
75
    /**
76
     * @return string
77
     */
78 4
    public function getRunClass()
79
    {
80 4
        return $this->runClass;
81
    }
82
83
    /**
84
     * @return string
85
     */
86 4
    public function getRunArchiveClass()
87
    {
88 4
        return $this->runArchiveClass;
89
    }
90
91
    /**
92
     * @return ObjectRepository
93
     */
94 16
    public function getRepository()
95
    {
96 16
        return $this->getObjectManager()->getRepository($this->getObjectName());
97
    }
98
99
    /**
100
     * @param string $objectName
101
     */
102
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
103
104 2
    public function resetErroneousJobs($workerName = null, $method = null)
105
    {
106 2
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
107
108 2
        $criterion = ['status' => Job::STATUS_ERROR];
109 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
110
111 2
        $countProcessed = 0;
112 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
113 2
            $countProcessed += $this->resetJobsByCriterion(
114 2
                $criterion, static::FETCH_COUNT, $i);
115
        }
116
117 2
        return $countProcessed;
118
    }
119
120
    /**
121
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
122
     *
123
     * @param null $workerName
124
     * @param null $method
125
     *
126
     * @return mixed
127
     */
128
    abstract protected function updateExpired($workerName = null, $method = null);
129
130 8
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
131
    {
132 8
        if (null !== $workerName) {
133 4
            $criterion['workerName'] = $workerName;
134
        }
135 8
        if (null !== $method) {
136 4
            $criterion['method'] = $method;
137
        }
138 8
    }
139
140 2
    public function pruneExpiredJobs($workerName = null, $method = null)
141
    {
142 2
        $count = $this->updateExpired($workerName, $method);
143 2
        $criterion = ['status' => Job::STATUS_EXPIRED];
144 2
        $this->addWorkerNameMethod($criterion, $workerName, $method);
145 2
        $objectManager = $this->getObjectManager();
146 2
        $repository = $this->getRepository();
147 2
        $finalCount = 0;
148 2
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
149 2
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
150 2
            if (!empty($expiredJobs)) {
151 2
                foreach ($expiredJobs as $expiredJob) {
152
                    /* @var Job $expiredJob */
153 2
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
154 2
                    $objectManager->remove($expiredJob);
155 2
                    ++$finalCount;
156
                }
157
            }
158 2
            $objectManager->flush();
159
        }
160
161 2
        return $finalCount;
162
    }
163
164 4
    protected function getStalledJobs($workerName = null, $method = null)
165
    {
166 4
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
167
168 4
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
169 4
        $this->addWorkerNameMethod($criterion, $workerName, $method);
170
171 4
        $runningJobs = $this->findRunningJobs($criterion, $count);
172
173 4
        return $this->extractStalledJobs($runningJobs);
174
    }
175
176 4
    protected function findRunningJobs($criterion, $count)
177
    {
178 4
        $repository = $this->getRepository();
179 4
        $runningJobsById = [];
180
181 4
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
182 4
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
183 4
            if (!empty($runningJobs)) {
184 4
                foreach ($runningJobs as $job) {
185
                    /** @var RetryableJob $job */
186 4
                    if (null !== $runId = $job->getRunId()) {
187 4
                        $runningJobsById[$runId][] = $job;
188
                    }
189
                }
190
            }
191
        }
192
193 4
        return $runningJobsById;
194
    }
195
196 4
    protected function extractStalledJobs(array $runningJobsById)
197
    {
198 4
        $objectManager = $this->getObjectManager();
199 4
        $runRepository = $objectManager->getRepository($this->runClass);
200
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
201 4
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
202
203 4
        $stalledJobs = [];
204 4
        foreach (array_keys($runningJobsById) as $runId) {
205 4
            if ($runRepository->find($runId)) {
206
                continue;
207
            }
208
            /** @var Run $run */
209 4
            if ($run = $runArchiveRepository->find($runId)) {
210 4
                if ($endTime = $run->getEndedAt()) {
211
                    // Did it end over an hour ago
212 4
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
213 4
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
214
                    }
215
                }
216
            }
217
        }
218
219 4
        return $stalledJobs;
220
    }
221
222 2
    public function resetStalledJobs($workerName = null, $method = null)
223
    {
224 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
225
226 2
        $objectManager = $this->getObjectManager();
227
228 2
        $countProcessed = 0;
229 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
230 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
231 2
                $job = $stalledJobs[$j];
232
                /* RetryableJob $job */
233 2
                $job->setStalledCount($job->getStalledCount() + 1);
234 2 View Code Duplication
                if (null !== ($maxStalled = $job->getMaxStalled()) && $job->getStalledCount() >= $maxStalled) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
235
                    $job->setStatus(RetryableJob::STATUS_MAX_STALLED);
236
                    $objectManager->remove($job);
237
                    continue;
238
                }
239 2 View Code Duplication
                if (null !== ($maxRetries = $job->getMaxRetries()) && $job->getRetries() >= $maxRetries) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
240
                    $job->setStatus(RetryableJob::STATUS_MAX_RETRIES);
241
                    $objectManager->remove($job);
242
                    continue;
243
                }
244
245 2
                $job->setRetries($job->getRetries() + 1);
246 2
                $job->setStatus(BaseJob::STATUS_NEW);
247 2
                $job->setLocked(null);
248 2
                $job->setLockedAt(null);
249 2
                $objectManager->persist($job);
250 2
                ++$countProcessed;
251
            }
252 2
            $objectManager->flush();
253
        }
254
255 2
        return $countProcessed;
256
    }
257
258
    /**
259
     * @param string $workerName
260
     * @param string $method
261
     */
262 2
    public function pruneStalledJobs($workerName = null, $method = null)
263
    {
264 2
        $stalledJobs = $this->getStalledJobs($workerName, $method);
265 2
        $objectManager = $this->getObjectManager();
266
267 2
        $countProcessed = 0;
268 2
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
269 2
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
270
                /** @var RetryableJob $job */
271 2
                $job = $stalledJobs[$j];
272 2
                $job->setStalledCount($job->getStalledCount() + 1);
273 2
                $job->setStatus(BaseJob::STATUS_ERROR);
274 2
                $job->setMessage('stalled');
275 2 View Code Duplication
                if (null !== ($maxStalled = $job->getMaxStalled()) && ($job->getStalledCount() >= $job->getMaxStalled())) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
276
                    $job->setStatus(RetryableJob::STATUS_MAX_STALLED);
277
                }
278 2
                $objectManager->remove($job);
279 2
                ++$countProcessed;
280
            }
281 2
            $objectManager->flush();
282
        }
283
284 2
        return $countProcessed;
285
    }
286
287 8
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
288
    {
289 8
        $objectManager = $this->getObjectManager();
290 8
        $objectManager->remove($job);
291 8
        $objectManager->flush();
292 8
    }
293
294
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
295
    {
296
        $this->deleteJob($job); // Should cause job to be archived
297
    }
298
299 22
    public function save(\Dtc\QueueBundle\Model\Job $job)
300
    {
301
        // Todo: Serialize args
302
303
        // Generate crc hash for the job
304 22
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
305 22
        $crcHash = hash('sha256', serialize($hashValues));
306 22
        $job->setCrcHash($crcHash);
307 22
        $objectManager = $this->getObjectManager();
308
309 22
        if (true === $job->getBatch()) {
310
            // See if similar job that hasn't run exists
311
            $criteria = array('crcHash' => $crcHash, 'status' => BaseJob::STATUS_NEW);
312
            $oldJob = $this->getRepository()->findOneBy($criteria);
313
314
            if ($oldJob) {
315
                // Old job exists - just override fields Set higher priority
316
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
317
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
318
                $oldJob->setBatch(true);
319
                $objectManager->persist($oldJob);
320
                $objectManager->flush();
321
322
                return $oldJob;
323
            }
324
        }
325
326
        // Just save a new job
327 22 View Code Duplication
        if (!$job->getId() && $objectManager instanceof EntityManager) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
328 11
            if (null !== self::$resetInsertCalled && spl_object_hash($objectManager) === self::$resetInsertCalled) {
329
                // Insert SQL is cached...
330
                throw new \Exception("Can't call save and reset within the same process cycle");
331
            }
332 11
            self::$saveInsertCalled = spl_object_hash($objectManager);
333
        }
334 22
        $objectManager->persist($job);
335 22
        $objectManager->flush();
336
337 22
        return $job;
338
    }
339
340
    /**
341
     * @param string $objectName
342
     */
343
    abstract protected function stopIdGenerator($objectName);
344
345
    abstract protected function restoreIdGenerator($objectName);
346
347
    /**
348
     * @param int $limit
349
     * @param int $offset
350
     */
351 2
    private function resetJobsByCriterion(
352
        $criterion,
353
        $limit,
354
        $offset)
355
    {
356 2
        $objectManager = $this->getObjectManager();
357 2 View Code Duplication
        if ($objectManager instanceof EntityManager) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
358 1
            if (null !== self::$saveInsertCalled && spl_object_hash($objectManager) === self::$saveInsertCalled) {
359
                // Insert SQL is cached...
360
                throw new \Exception("Can't call reset and save within the same process cycle");
361
            }
362 1
            self::$resetInsertCalled = spl_object_hash($objectManager);
363
        }
364
365 2
        $objectName = $this->getObjectName();
366 2
        $archiveObjectName = $this->getArchiveObjectName();
367 2
        $jobRepository = $objectManager->getRepository($objectName);
368 2
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
369 2
        $className = $jobRepository->getClassName();
370 2
        $metadata = $objectManager->getClassMetadata($className);
371 2
        $this->stopIdGenerator($objectName);
372 2
        $identifierData = $metadata->getIdentifier();
373 2
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
374 2
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
375 2
        $countProcessed = 0;
376
377 2
        foreach ($results as $jobArchive) {
378
            /** @var RetryableJob $jobArchive */
379 2 View Code Duplication
            if (null !== ($maxRetries = $jobArchive->getMaxRetries()) && $jobArchive->getRetries() >= $maxRetries) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
380
                $jobArchive->setStatus(RetryableJob::STATUS_MAX_RETRIES);
381
                $objectManager->persist($jobArchive);
382
                continue;
383
            }
384
385
            /** @var RetryableJob $job */
386 2
            $job = new $className();
387
388 2
            Util::copy($jobArchive, $job);
389 2
            $job->setStatus(BaseJob::STATUS_NEW);
390 2
            $job->setLocked(null);
391 2
            $job->setLockedAt(null);
392 2
            $job->setMessage(null);
393 2
            $job->setFinishedAt(null);
394 2
            $job->setStartedAt(null);
395 2
            $job->setElapsed(null);
396 2
            $job->setRetries($job->getRetries() + 1);
397
398 2
            $objectManager->persist($job);
399 2
            $objectManager->remove($jobArchive);
400 2
            ++$countProcessed;
401
        }
402 2
        $objectManager->flush();
403
404 2
        $this->restoreIdGenerator($objectName);
405
406 2
        return $countProcessed;
407
    }
408
}
409