Completed
Push — master ( 1204eb...5574d8 )
by Matthew
05:57
created

BaseJobManager::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 7
cts 7
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 10
nc 1
nop 5
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Model\BaseJob;
10
use Dtc\QueueBundle\Model\Job;
11
use Dtc\QueueBundle\Model\PriorityJobManager;
12
use Dtc\QueueBundle\Model\RetryableJob;
13
use Dtc\QueueBundle\Model\Run;
14
use Dtc\QueueBundle\Util\Util;
15
16
abstract class BaseJobManager extends PriorityJobManager
17
{
18
    /** Number of jobs to prune / reset / gather at a time */
19
    const FETCH_COUNT = 100;
20
21
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
22
    const STALLED_SECONDS = 1800;
23
    protected $objectManager;
24
    protected $objectName;
25
    protected $archiveObjectName;
26
    protected $runClass;
27
    protected $runArchiveClass;
28
29
    /**
30
     * @param string $objectName
31
     * @param string $archiveObjectName
32
     * @param string $runClass
33
     * @param string $runArchiveClass
34
     */
35 4
    public function __construct(ObjectManager $objectManager,
36
        $objectName,
37
        $archiveObjectName,
38
        $runClass,
39
        $runArchiveClass)
40
    {
41 4
        $this->objectManager = $objectManager;
42 4
        $this->objectName = $objectName;
43 4
        $this->archiveObjectName = $archiveObjectName;
44 4
        $this->runClass = $runClass;
45 4
        $this->runArchiveClass = $runArchiveClass;
46 4
    }
47
48
    /**
49
     * @return ObjectManager
50
     */
51 7
    public function getObjectManager()
52
    {
53 7
        return $this->objectManager;
54
    }
55
56
    /**
57
     * @return string
58
     */
59 7
    public function getObjectName()
60
    {
61 7
        return $this->objectName;
62
    }
63
64
    /**
65
     * @return string
66
     */
67 6
    public function getArchiveObjectName()
68
    {
69 6
        return $this->archiveObjectName;
70
    }
71
72
    /**
73
     * @return string
74
     */
75 1
    public function getRunClass()
76
    {
77 1
        return $this->runClass;
78
    }
79
80
    /**
81
     * @return string
82
     */
83 1
    public function getRunArchiveClass()
84
    {
85 1
        return $this->runArchiveClass;
86
    }
87
88
    /**
89
     * @return ObjectRepository
90
     */
91 6
    public function getRepository()
92
    {
93 6
        return $this->getObjectManager()->getRepository($this->getObjectName());
94
    }
95
96
    /**
97
     * @param string $objectName
98
     */
99
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
100
101 1
    public function resetErroneousJobs($workerName = null, $method = null)
102
    {
103 1
        $count = $this->countJobsByStatus($this->getArchiveObjectName(), Job::STATUS_ERROR, $workerName, $method);
104
105 1
        $criterion = ['status' => Job::STATUS_ERROR];
106 1
        $this->addWorkerNameMethod($criterion, $workerName, $method);
107
108 1
        $countProcessed = 0;
109 1
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
110 1
            $countProcessed += $this->resetJobsByCriterion(
111 1
                $criterion, static::FETCH_COUNT, $i);
112
        }
113
114 1
        return $countProcessed;
115
    }
116
117
    /**
118
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
119
     *
120
     * @param null $workerName
121
     * @param null $method
122
     *
123
     * @return mixed
124
     */
125
    abstract protected function updateExpired($workerName = null, $method = null);
126
127 3
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
128
    {
129 3
        if (null !== $workerName) {
130
            $criterion['workerName'] = $workerName;
131
        }
132 3
        if (null !== $method) {
133
            $criterion['method'] = $method;
134
        }
135 3
    }
136
137 1
    public function pruneExpiredJobs($workerName = null, $method = null)
138
    {
139 1
        $count = $this->updateExpired($workerName, $method);
140 1
        $criterion = ['status' => Job::STATUS_EXPIRED];
141 1
        $this->addWorkerNameMethod($criterion, $workerName, $method);
142 1
        $objectManager = $this->getObjectManager();
143 1
        $repository = $this->getRepository();
144 1
        $finalCount = 0;
145 1
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
146 1
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
147 1
            if (!empty($expiredJobs)) {
148 1
                foreach ($expiredJobs as $expiredJob) {
149
                    /* @var Job $expiredJob */
150 1
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
151 1
                    $objectManager->remove($expiredJob);
152 1
                    ++$finalCount;
153
                }
154
            }
155 1
            $this->flush();
156
        }
157
158 1
        return $finalCount;
159
    }
160
161 2
    protected function flush()
162
    {
163 2
        $this->getObjectManager()->flush();
164 2
    }
165
166 1
    protected function getStalledJobs($workerName = null, $method = null)
167
    {
168 1
        $count = $this->countJobsByStatus($this->getObjectName(), Job::STATUS_RUNNING, $workerName, $method);
169
170 1
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
171 1
        $this->addWorkerNameMethod($criterion, $workerName, $method);
172
173 1
        $runningJobs = $this->findRunningJobs($criterion, $count);
174
175 1
        return $this->extractStalledJobs($runningJobs);
176
    }
177
178 1
    protected function findRunningJobs($criterion, $count)
179
    {
180 1
        $repository = $this->getRepository();
181 1
        $runningJobsById = [];
182
183 1
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
184 1
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
185 1
            if (!empty($runningJobs)) {
186 1
                foreach ($runningJobs as $job) {
187
                    /** @var RetryableJob $job */
188 1
                    if (null !== $runId = $job->getRunId()) {
189 1
                        $runningJobsById[$runId][] = $job;
190
                    }
191
                }
192
            }
193
        }
194
195 1
        return $runningJobsById;
196
    }
197
198
    /**
199
     * @param $runId
200
     * @param array $jobs
201
     * @param array $stalledJobs
202
     */
203 1
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
204
    {
205 1
        $objectManager = $this->getObjectManager();
206 1
        $runRepository = $objectManager->getRepository($this->runClass);
207 1
        if ($run = $runRepository->find($runId)) {
208
            foreach ($jobs as $job) {
209
                if ($run->getCurrentJobId() == $job->getId()) {
210
                    continue;
211
                }
212
                $stalledJobs[] = $job;
213
            }
214
        }
215 1
    }
216
217
    /**
218
     * @param array $runningJobsById
219
     *
220
     * @return array
221
     */
222 1
    protected function extractStalledJobs(array $runningJobsById)
223
    {
224 1
        $objectManager = $this->getObjectManager();
225
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
226 1
        $runArchiveRepository = $objectManager->getRepository($this->runArchiveClass);
227
228 1
        $stalledJobs = [];
229 1
        foreach (array_keys($runningJobsById) as $runId) {
230 1
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
231
            /** @var Run $run */
232 1
            if ($run = $runArchiveRepository->find($runId)) {
233 1
                if ($endTime = $run->getEndedAt()) {
234
                    // Did it end over an hour ago
235 1
                    if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
236 1
                        $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
237
                    }
238
                }
239
            }
240
        }
241
242 1
        return $stalledJobs;
243
    }
244
245 2
    protected function updateMaxStatus(RetryableJob $job, $status, $max = null, $count = 0)
246
    {
247 2
        if (null !== $max && $count >= $max) {
248 1
            $job->setStatus($status);
249
250 1
            return true;
251
        }
252
253 2
        return false;
254
    }
255
256
    abstract protected function getJobCurrentStatus(Job $job);
257
258 1
    protected function runStalledLoop($i, $count, array $stalledJobs, &$countProcessed)
259
    {
260 1
        $objectManager = $this->getObjectManager();
261 1
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
262
            /* RetryableJob $job */
263 1
            $job = $stalledJobs[$j];
264 1
            $status = $this->getJobCurrentStatus($job);
265
266
            // Query the data store to make sure the job is still marked running
267 1
            if (BaseJob::STATUS_RUNNING !== $status) {
268
                continue;
269
            }
270
271 1
            $job->setStalledCount($job->getStalledCount() + 1);
272 1
            if ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount())) {
273
                $objectManager->remove($job);
274
                continue;
275 1
            } elseif ($this->updateMaxStatus($job, RetryableJob::STATUS_MAX_RETRIES, $job->getMaxRetries(), $job->getRetries())) {
276
                $objectManager->remove($job);
277
                continue;
278
            }
279
280 1
            $job->setRetries($job->getRetries() + 1);
281 1
            $job->setStatus(BaseJob::STATUS_NEW);
282 1
            $job->setLocked(null);
283 1
            $job->setLockedAt(null);
284 1
            $objectManager->persist($job);
285 1
            ++$countProcessed;
286
        }
287 1
    }
288
289 1
    public function resetStalledJobs($workerName = null, $method = null)
290
    {
291 1
        $objectManager = $this->getObjectManager();
0 ignored issues
show
Unused Code introduced by
$objectManager is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
292 1
        $stalledJobs = $this->getStalledJobs($workerName, $method);
293
294 1
        $countProcessed = 0;
295 1
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
296 1
            $this->runStalledLoop($i, $count, $stalledJobs, $countProcessed);
297 1
            $this->flush();
298
        }
299
300 1
        return $countProcessed;
301
    }
302
303
    /**
304
     * @param string $workerName
305
     * @param string $method
306
     */
307
    public function pruneStalledJobs($workerName = null, $method = null)
308
    {
309
        $stalledJobs = $this->getStalledJobs($workerName, $method);
310
        $objectManager = $this->getObjectManager();
311
312
        $countProcessed = 0;
313
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
314
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
315
                /** @var RetryableJob $job */
316
                $job = $stalledJobs[$j];
317
                $job->setStalledCount($job->getStalledCount() + 1);
318
                $job->setStatus(BaseJob::STATUS_ERROR);
319
                $job->setMessage('stalled');
320
                $this->updateMaxStatus($job, RetryableJob::STATUS_MAX_STALLED, $job->getMaxStalled(), $job->getStalledCount());
321
                $objectManager->remove($job);
322
                ++$countProcessed;
323
            }
324
            $this->flush();
325
        }
326
327
        return $countProcessed;
328
    }
329
330 3
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
331
    {
332 3
        $objectManager = $this->getObjectManager();
333 3
        $objectManager->remove($job);
334 3
        $objectManager->flush();
335 3
    }
336
337 1
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
338
    {
339 1
        $this->deleteJob($job); // Should cause job to be archived
340 1
    }
341
342 5
    protected function prioritySave(\Dtc\QueueBundle\Model\Job $job)
343
    {
344
        // Generate crc hash for the job
345 5
        $hashValues = array($job->getClassName(), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
346 5
        $crcHash = hash('sha256', serialize($hashValues));
347 5
        $job->setCrcHash($crcHash);
348 5
        $objectManager = $this->getObjectManager();
349
350 5
        if (true === $job->getBatch()) {
351
            $oldJob = $this->updateNearestBatch($job);
352
            if ($oldJob) {
353
                return $oldJob;
354
            }
355
        }
356
357
        // Just save a new job
358 5
        $this->resetSaveOk(__FUNCTION__);
359 5
        $objectManager->persist($job);
360 5
        $objectManager->flush();
361
362 5
        return $job;
363
    }
364
365
    abstract protected function updateNearestBatch(Job $job);
366
367
    /**
368
     * @param string $objectName
369
     */
370
    abstract protected function stopIdGenerator($objectName);
371
372
    abstract protected function restoreIdGenerator($objectName);
373
374
    /**
375
     * @param array $criterion
376
     * @param int   $limit
377
     * @param int   $offset
378
     */
379 1
    private function resetJobsByCriterion(
380
        array $criterion,
381
        $limit,
382
        $offset)
383
    {
384 1
        $objectManager = $this->getObjectManager();
385 1
        $this->resetSaveOk(__FUNCTION__);
386 1
        $objectName = $this->getObjectName();
387 1
        $archiveObjectName = $this->getArchiveObjectName();
388 1
        $jobRepository = $objectManager->getRepository($objectName);
389 1
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
390 1
        $className = $jobRepository->getClassName();
391 1
        $metadata = $objectManager->getClassMetadata($className);
392 1
        $this->stopIdGenerator($objectName);
393 1
        $identifierData = $metadata->getIdentifier();
394 1
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
395 1
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
396 1
        $countProcessed = 0;
397
398 1
        foreach ($results as $jobArchive) {
399 1
            $this->resetJob($jobArchive, $className, $countProcessed);
400
        }
401 1
        $objectManager->flush();
402
403 1
        $this->restoreIdGenerator($objectName);
404
405 1
        return $countProcessed;
406
    }
407
408 5
    protected function resetSaveOk($function)
409
    {
410 5
    }
411
412
    /**
413
     * @param RetryableJob $jobArchive
414
     * @param $className
415
     * @param $countProcessed
416
     */
417 1
    protected function resetJob(RetryableJob $jobArchive, $className, &$countProcessed)
418
    {
419 1
        $objectManager = $this->getObjectManager();
420 1
        if ($this->updateMaxStatus($jobArchive, RetryableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
421 1
            $objectManager->persist($jobArchive);
422
423 1
            return;
424
        }
425
426
        /** @var RetryableJob $job */
427 1
        $job = new $className();
428
429 1
        Util::copy($jobArchive, $job);
430 1
        $job->setStatus(BaseJob::STATUS_NEW);
431 1
        $job->setLocked(null);
432 1
        $job->setLockedAt(null);
433 1
        $job->setMessage(null);
434 1
        $job->setFinishedAt(null);
435 1
        $job->setStartedAt(null);
436 1
        $job->setElapsed(null);
437 1
        $job->setRetries($job->getRetries() + 1);
438
439 1
        $objectManager->persist($job);
440 1
        $objectManager->remove($jobArchive);
441 1
        ++$countProcessed;
442 1
    }
443
}
444