Completed
Push — master ( 6c3722...239355 )
by Matthew
19:49 queued 17:50
created

BaseJobManager::updateMaxStatus()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 0
cts 8
cp 0
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 2
nop 4
crap 12
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\AbstractJobManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\Job;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Model\Run;
14
use Dtc\QueueBundle\Util\Util;
15
16
abstract class BaseJobManager extends AbstractJobManager
17
{
18
    /** Number of jobs to prune / reset / gather at a time */
19
    const FETCH_COUNT = 100;
20
21
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
22
    const STALLED_SECONDS = 1800;
23
    protected $objectManager;
24
    protected $objectName;
25
    protected $archiveObjectName;
26
    protected $runClass;
27
    protected $runArchiveClass;
28
29
    /**
30
     * @param string $objectName
31
     * @param string $archiveObjectName
32
     * @param string $runClass
33
     * @param string $runArchiveClass
34
     */
35
    public function __construct(ObjectManager $objectManager,
36
        $objectName,
37
        $archiveObjectName,
38
        $runClass,
39
        $runArchiveClass)
40
    {
41
        $this->objectManager = $objectManager;
42
        $this->objectName = $objectName;
43
        $this->archiveObjectName = $archiveObjectName;
44
        $this->runClass = $runClass;
45
        $this->runArchiveClass = $runArchiveClass;
46
    }
47
48
    /**
49
     * @return ObjectManager
50
     */
51
    public function getObjectManager()
52
    {
53
        return $this->objectManager;
54
    }
55
56
    /**
57
     * @return string
58
     */
59
    public function getObjectName()
60
    {
61
        return $this->objectName;
62
    }
63
64
    /**
65
     * @return string
66
     */
67
    public function getArchiveObjectName()
68
    {
69
        return $this->archiveObjectName;
70
    }
71
72
    /**
73
     * @return string
74
     */
75
    public function getRunClass()
76
    {
77
        return $this->runClass;
78
    }
79
80
    /**
81
     * @return string
82
     */
83
    public function getRunArchiveClass()
84
    {
85
        return $this->runArchiveClass;
86
    }
87
88
    /**
89
     * @return ObjectRepository
90
     */
91
    public function getRepository()
92
    {
93
        return $this->getObjectManager()->getRepository($this->getObjectName());
94
    }
95
96
    /**
97
     * @param string $objectName
98
     */
99
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
100
101
    public function resetErroneousJobs($workerName = null, $method = null)
102
    {
103
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
104
105
        $criterion = ['status' => Job::STATUS_ERROR];
106
        $this->addWorkerNameMethod($criterion, $workerName, $method);
107
108
        $countProcessed = 0;
109
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
110
            $countProcessed += $this->resetJobsByCriterion(
111
                $criterion, static::FETCH_COUNT, $i);
112
        }
113
114
        return $countProcessed;
115
    }
116
117
    /**
118
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
119
     *
120
     * @param null $workerName
121
     * @param null $method
122
     *
123
     * @return mixed
124
     */
125
    abstract protected function updateExpired($workerName = null, $method = null);
126
127
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
128
    {
129
        if (null !== $workerName) {
130
            $criterion['workerName'] = $workerName;
131
        }
132
        if (null !== $method) {
133
            $criterion['method'] = $method;
134
        }
135
    }
136
137
    public function pruneExpiredJobs($workerName = null, $method = null)
138
    {
139
        $count = $this->updateExpired($workerName, $method);
140
        $criterion = ['status' => Job::STATUS_EXPIRED];
141
        $this->addWorkerNameMethod($criterion, $workerName, $method);
142
        $objectManager = $this->getObjectManager();
143
        $repository = $this->getRepository();
144
        $finalCount = 0;
145
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
146
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
147
            if (!empty($expiredJobs)) {
148
                foreach ($expiredJobs as $expiredJob) {
149
                    /* @var Job $expiredJob */
150
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
151
                    $objectManager->remove($expiredJob);
152
                    ++$finalCount;
153
                }
154
            }
155
            $objectManager->flush();
156
        }
157
158
        return $finalCount;
159
    }
160
161
    protected function getStalledJobs($workerName = null, $method = null)
162
    {
163
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
164
165
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
166
        $this->addWorkerNameMethod($criterion, $workerName, $method);
167
168
        $runningJobs = $this->findRunningJobs($criterion, $count);
169
170
        return $this->extractStalledJobs($runningJobs);
171
    }
172
173
    protected function findRunningJobs($criterion, $count)
174
    {
175
        $repository = $this->getRepository();
176
        $runningJobsById = [];
177
178
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
179
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
180
            if (!empty($runningJobs)) {
181
                foreach ($runningJobs as $job) {
182
                    /** @var RetryableJob $job */
183
                    if (null !== $runId = $job->getRunId()) {
184
                        $runningJobsById[$runId][] = $job;
185
                    }
186
                }
187
            }
188
        }
189
190
        return $runningJobsById;
191
    }
192
193
    protected function extractStalledJobs(array $runningJobsById)
194
    {
195
        $objectManager = $this->getObjectManager();
196
        $runRepository = $objectManager->getRepository($this->runClass);
197
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
198
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
199
200
        $stalledJobs = [];
201
        foreach (array_keys($runningJobsById) as $runId) {
202
            if ($runRepository->find($runId)) {
203
                continue;
204
            }
205
            /** @var Run $run */
206
            if ($run = $runArchiveRepository->find($runId)) {
207
                if ($endTime = $run->getEndedAt()) {
208
                    // Did it end over an hour ago
209
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
210
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
211
                    }
212
                }
213
            }
214
        }
215
216
        return $stalledJobs;
217
    }
218
219
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
220
    {
221
        if (null !== $max && $count >= $max) {
222
            $job->setStatus($status);
223
224
            return true;
225
        }
226
227
        return false;
228
    }
229
230
    public function resetStalledJobs($workerName = null, $method = null)
231
    {
232
        $stalledJobs = $this->getStalledJobs($workerName, $method);
233
234
        $objectManager = $this->getObjectManager();
235
236
        $countProcessed = 0;
237
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
238
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
239
                $job = $stalledJobs[$j];
240
                /* RetryableJob $job */
241
                $job->setStalledCount($job->getStalledCount() + 1);
242
                if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount())) {
243
                    $objectManager->remove($job);
244
                    continue;
245
                } elseif ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
246
                    $objectManager->remove($job);
247
                    continue;
248
                }
249
250
                $job->setRetries($job->getRetries() + 1);
251
                $job->setStatus(BaseJob::STATUS_NEW);
252
                $job->setLocked(null);
253
                $job->setLockedAt(null);
254
                $objectManager->persist($job);
255
                ++$countProcessed;
256
            }
257
            $objectManager->flush();
258
        }
259
260
        return $countProcessed;
261
    }
262
263
    /**
264
     * @param string $workerName
265
     * @param string $method
266
     */
267
    public function pruneStalledJobs($workerName = null, $method = null)
268
    {
269
        $stalledJobs = $this->getStalledJobs($workerName, $method);
270
        $objectManager = $this->getObjectManager();
271
272
        $countProcessed = 0;
273
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
274
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
275
                /** @var RetryableJob $job */
276
                $job = $stalledJobs[$j];
277
                $job->setStalledCount($job->getStalledCount() + 1);
278
                $job->setStatus(BaseJob::STATUS_ERROR);
279
                $job->setMessage('stalled');
280
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
281
                $objectManager->remove($job);
282
                ++$countProcessed;
283
            }
284
            $objectManager->flush();
285
        }
286
287
        return $countProcessed;
288
    }
289
290
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
291
    {
292
        $objectManager = $this->getObjectManager();
293
        $objectManager->remove($job);
294
        $objectManager->flush();
295
    }
296
297
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
298
    {
299
        $this->deleteJob($job); // Should cause job to be archived
300
    }
301
302
    public function save(\Dtc\QueueBundle\Model\Job $job)
303
    {
304
        // Todo: Serialize args
305
306
        // Generate crc hash for the job
307
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
308
        $crcHash = hash('sha256', serialize($hashValues));
309
        $job->setCrcHash($crcHash);
310
        $objectManager = $this->getObjectManager();
311
312
        if (true === $job->getBatch()) {
313
            // See if similar job that hasn't run exists
314
            $criteria = array('crcHash' => $crcHash, 'status' => BaseJob::STATUS_NEW);
315
            $oldJob = $this->getRepository()->findOneBy($criteria);
316
317
            if ($oldJob) {
318
                // Old job exists - just override fields Set higher priority
319
                $oldJob->setPriority(max($job->getPriority(), $oldJob->getPriority()));
320
                $oldJob->setWhenAt(min($job->getWhenAt(), $oldJob->getWhenAt()));
321
                $oldJob->setBatch(true);
322
                $objectManager->persist($oldJob);
323
                $objectManager->flush();
324
325
                return $oldJob;
326
            }
327
        }
328
329
        // Just save a new job
330
        $this->resetSaveOk(__FUNCTION__);
331
        $objectManager->persist($job);
332
        $objectManager->flush();
333
334
        return $job;
335
    }
336
337
    /**
338
     * @param string $objectName
339
     */
340
    abstract protected function stopIdGenerator($objectName);
341
342
    abstract protected function restoreIdGenerator($objectName);
343
344
    /**
345
     * @param array $criterion
346
     * @param int   $limit
347
     * @param int   $offset
348
     */
349
    private function resetJobsByCriterion(
350
        array $criterion,
351
        $limit,
352
        $offset)
353
    {
354
        $objectManager = $this->getObjectManager();
355
        $this->resetSaveOk(__FUNCTION__);
356
        $objectName = $this->getObjectName();
357
        $archiveObjectName = $this->getArchiveObjectName();
358
        $jobRepository = $objectManager->getRepository($objectName);
359
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
360
        $className = $jobRepository->getClassName();
361
        $metadata = $objectManager->getClassMetadata($className);
362
        $this->stopIdGenerator($objectName);
363
        $identifierData = $metadata->getIdentifier();
364
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
365
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
366
        $countProcessed = 0;
367
368
        foreach ($results as $jobArchive) {
369
            $this->resetJob($jobArchive, $className, $countProcessed);
370
        }
371
        $objectManager->flush();
372
373
        $this->restoreIdGenerator($objectName);
374
375
        return $countProcessed;
376
    }
377
378
    protected function resetSaveOk($function)
379
    {
380
    }
381
382
    /**
383
     * @param RetryableJob $jobArchive
384
     * @param $className
385
     * @param $countProcessed
386
     */
387
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
388
    {
389
        $objectManager = $this->getObjectManager();
390
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
391
            $objectManager->persist($jobArchive);
392
393
            return;
394
        }
395
396
        /** @var RetryableJob $job */
397
        $job = new $className();
398
399
        Util::copy($jobArchive, $job);
400
        $job->setStatus(BaseJob::STATUS_NEW);
401
        $job->setLocked(null);
402
        $job->setLockedAt(null);
403
        $job->setMessage(null);
404
        $job->setFinishedAt(null);
405
        $job->setStartedAt(null);
406
        $job->setElapsed(null);
407
        $job->setRetries($job->getRetries() + 1);
408
409
        $objectManager->persist($job);
410
        $objectManager->remove($jobArchive);
411
        ++$countProcessed;
412
    }
413
}
414