Completed
Pull Request — master (#30)
by Matthew
19:54
created

DoctrineJobManager   C

Complexity

Total Complexity 58

Size/Duplication

Total Lines 415
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Importance

Changes 0
Metric Value
wmc 58
lcom 1
cbo 9
dl 0
loc 415
rs 6.3005
c 0
b 0
f 0

31 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 10 1
A getObjectManager() 0 4 1
A getRepository() 0 4 1
countJobsByStatus() 0 1 ?
A resetExceptionJobs() 0 18 2
updateExpired() 0 1 ?
A addWorkerNameMethod() 0 9 3
B pruneExpiredJobs() 0 28 5
A flush() 0 4 1
A getStalledJobs() 0 11 1
B findRunningJobs() 0 19 5
A extractStalledLiveRuns() 0 13 4
A extractStalledJobs() 0 10 2
B extractStalledJobsRunArchive() 0 22 6
A runStalledLoop() 0 14 4
A resetStalledJobs() 0 16 4
A pruneStalledJobs() 0 19 4
A deleteJob() 0 6 1
A stallableSaveHistory() 0 8 2
A stallableSave() 0 22 3
updateNearestBatch() 0 1 ?
stopIdGenerator() 0 1 ?
restoreIdGenerator() 0 1 ?
B resetJobsByCriterion() 0 28 3
A resetSaveOk() 0 3 1
getJobQueryBuilder() 0 1 ?
A resetArchiveJob() 0 19 2
A resetJob() 0 16 2
getWorkersAndMethods() 0 1 ?
countLiveJobs() 0 1 ?
archiveAllJobs() 0 1 ?

How to fix   Complexity   

Complex Class

Complex classes like DoctrineJobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use DoctrineJobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\Doctrine;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Doctrine\Common\Persistence\ObjectRepository;
7
use Doctrine\ODM\MongoDB\DocumentRepository;
8
use Doctrine\ORM\EntityRepository;
9
use Dtc\QueueBundle\Manager\ArchivableJobManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\RetryableJob;
12
use Dtc\QueueBundle\Model\Job;
13
use Dtc\QueueBundle\Model\JobTiming;
14
use Dtc\QueueBundle\Manager\JobTimingManager;
15
use Dtc\QueueBundle\Model\StallableJob;
16
use Dtc\QueueBundle\Model\Run;
17
use Dtc\QueueBundle\Manager\RunManager;
18
use Dtc\QueueBundle\Util\Util;
19
20
abstract class DoctrineJobManager extends ArchivableJobManager
21
{
22
    /** Number of jobs to prune / reset / gather at a time */
23
    const FETCH_COUNT = 100;
24
25
    /** Number of seconds before a job is considered stalled if the runner is no longer active */
26
    const STALLED_SECONDS = 1800;
27
28
    /**
29
     * @var ObjectManager
30
     */
31
    protected $objectManager;
32
33
    /**
34
     * DoctrineJobManager constructor.
35
     *
36
     * @param RunManager       $runManager
37
     * @param JobTimingManager $jobTimingManager
38
     * @param ObjectManager    $objectManager
39
     * @param $jobClass
40
     * @param $jobArchiveClass
41
     */
42
    public function __construct(
43
        RunManager $runManager,
44
        JobTimingManager $jobTimingManager,
45
        ObjectManager $objectManager,
46
        $jobClass,
47
        $jobArchiveClass
48
    ) {
49
        $this->objectManager = $objectManager;
50
        parent::__construct($runManager, $jobTimingManager, $jobClass, $jobArchiveClass);
51
    }
52
53
    /**
54
     * @return ObjectManager
55
     */
56
    public function getObjectManager()
57
    {
58
        return $this->objectManager;
59
    }
60
61
    /**
62
     * @return ObjectRepository
63
     */
64
    public function getRepository()
65
    {
66
        return $this->getObjectManager()->getRepository($this->getJobClass());
67
    }
68
69
    /**
70
     * @param string $objectName
71
     */
72
    abstract protected function countJobsByStatus($objectName, $status, $workerName = null, $method = null);
73
74
    public function resetExceptionJobs($workerName = null, $method = null)
75
    {
76
        $count = $this->countJobsByStatus($this->getJobArchiveClass(), Job::STATUS_EXCEPTION, $workerName, $method);
77
78
        $criterion = ['status' => Job::STATUS_EXCEPTION];
79
        $this->addWorkerNameMethod($criterion, $workerName, $method);
80
81
        $countProcessed = 0;
82
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
83
            $countProcessed += $this->resetJobsByCriterion(
84
                $criterion,
85
                static::FETCH_COUNT,
86
                $i
87
            );
88
        }
89
90
        return $countProcessed;
91
    }
92
93
    /**
94
     * Sets the status to Job::STATUS_EXPIRED for those jobs that are expired.
95
     *
96
     * @param null $workerName
97
     * @param null $method
98
     *
99
     * @return mixed
100
     */
101
    abstract protected function updateExpired($workerName = null, $method = null);
102
103
    protected function addWorkerNameMethod(array &$criterion, $workerName = null, $method = null)
104
    {
105
        if (null !== $workerName) {
106
            $criterion['workerName'] = $workerName;
107
        }
108
        if (null !== $method) {
109
            $criterion['method'] = $method;
110
        }
111
    }
112
113
    public function pruneExpiredJobs($workerName = null, $method = null)
114
    {
115
        $count = $this->updateExpired($workerName, $method);
116
        $criterion = ['status' => Job::STATUS_EXPIRED];
117
        $this->addWorkerNameMethod($criterion, $workerName, $method);
118
        $objectManager = $this->getObjectManager();
119
        $repository = $this->getRepository();
120
        $finalCount = 0;
121
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
122
            $expiredJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
123
            $innerCount = 0;
124
            if (!empty($expiredJobs)) {
125
                foreach ($expiredJobs as $expiredJob) {
126
                    /* @var Job $expiredJob */
127
                    $expiredJob->setStatus(Job::STATUS_EXPIRED);
128
                    $objectManager->remove($expiredJob);
129
                    ++$finalCount;
130
                    ++$innerCount;
131
                }
132
            }
133
            $this->flush();
134
            for ($j = 0; $j < $innerCount; ++$j) {
135
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
136
            }
137
        }
138
139
        return $finalCount;
140
    }
141
142
    protected function flush()
143
    {
144
        $this->getObjectManager()->flush();
145
    }
146
147
    protected function getStalledJobs($workerName = null, $method = null)
148
    {
149
        $count = $this->countJobsByStatus($this->getJobClass(), Job::STATUS_RUNNING, $workerName, $method);
150
151
        $criterion = ['status' => BaseJob::STATUS_RUNNING];
152
        $this->addWorkerNameMethod($criterion, $workerName, $method);
153
154
        $runningJobs = $this->findRunningJobs($criterion, $count);
155
156
        return $this->extractStalledJobs($runningJobs);
157
    }
158
159
    protected function findRunningJobs($criterion, $count)
160
    {
161
        $repository = $this->getRepository();
162
        $runningJobsById = [];
163
164
        for ($i = 0; $i < $count; $i += static::FETCH_COUNT) {
165
            $runningJobs = $repository->findBy($criterion, null, static::FETCH_COUNT, $i);
166
            if (!empty($runningJobs)) {
167
                foreach ($runningJobs as $job) {
168
                    /** @var StallableJob $job */
169
                    if (null !== $runId = $job->getRunId()) {
170
                        $runningJobsById[$runId][] = $job;
171
                    }
172
                }
173
            }
174
        }
175
176
        return $runningJobsById;
177
    }
178
179
    /**
180
     * @param $runId
181
     * @param array $jobs
182
     * @param array $stalledJobs
183
     */
184
    protected function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs)
185
    {
186
        $objectManager = $this->getObjectManager();
187
        $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass());
188
        if ($run = $runRepository->find($runId)) {
189
            foreach ($jobs as $job) {
190
                if ($run->getCurrentJobId() == $job->getId()) {
191
                    continue;
192
                }
193
                $stalledJobs[] = $job;
194
            }
195
        }
196
    }
197
198
    /**
199
     * @param array $runningJobsById
200
     *
201
     * @return array
202
     */
203
    protected function extractStalledJobs(array $runningJobsById)
204
    {
205
        $stalledJobs = [];
206
        foreach (array_keys($runningJobsById) as $runId) {
207
            $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs);
208
            $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId);
209
        }
210
211
        return $stalledJobs;
212
    }
213
214
    protected function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId)
215
    {
216
        $runManager = $this->getRunManager();
217
        if (!method_exists($runManager, 'getObjectManager')) {
218
            return;
219
        }
220
        if (!method_exists($runManager, 'getRunArchiveClass')) {
221
            return;
222
        }
223
224
        /** @var EntityRepository|DocumentRepository $runArchiveRepository */
225
        $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass());
226
        /** @var Run $run */
227
        if ($run = $runArchiveRepository->find($runId)) {
228
            if ($endTime = $run->getEndedAt()) {
229
                // Did it end over an hour ago
230
                if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) {
231
                    $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]);
232
                }
233
            }
234
        }
235
    }
236
237
    protected function runStalledLoop($i, $count, array $stalledJobs)
238
    {
239
        $resetCount = 0;
240
        for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
241
            /* StallableJob $job */
242
            $job = $stalledJobs[$j];
243
            $job->setStatus(StallableJob::STATUS_STALLED);
244
            if ($this->saveHistory($job)) {
245
                ++$resetCount;
246
            }
247
        }
248
249
        return $resetCount;
250
    }
251
252
    public function resetStalledJobs($workerName = null, $method = null)
253
    {
254
        $stalledJobs = $this->getStalledJobs($workerName, $method);
255
256
        $countReset = 0;
257
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
258
            $resetCount = $this->runStalledLoop($i, $count, $stalledJobs);
259
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
260
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_STALLED);
261
            }
262
            $countReset += $resetCount;
263
            $this->flush();
264
        }
265
266
        return $countReset;
267
    }
268
269
    /**
270
     * @param string $workerName
271
     * @param string $method
272
     */
273
    public function pruneStalledJobs($workerName = null, $method = null)
274
    {
275
        $stalledJobs = $this->getStalledJobs($workerName, $method);
276
277
        $countProcessed = 0;
278
        for ($i = 0, $count = count($stalledJobs); $i < $count; $i += static::FETCH_COUNT) {
279
            for ($j = $i, $max = $i + static::FETCH_COUNT; $j < $max && $j < $count; ++$j) {
280
                /** @var StallableJob $job */
281
                $job = $stalledJobs[$j];
282
                $job->setStatus(StallableJob::STATUS_STALLED);
283
                $job->setStalls(intval($job->getStalls()) + 1);
284
                $this->deleteJob($job);
285
                ++$countProcessed;
286
            }
287
            $this->flush();
288
        }
289
290
        return $countProcessed;
291
    }
292
293
    public function deleteJob(\Dtc\QueueBundle\Model\Job $job)
294
    {
295
        $objectManager = $this->getObjectManager();
296
        $objectManager->remove($job);
297
        $objectManager->flush();
298
    }
299
300
    protected function stallableSaveHistory(StallableJob $job, $retry)
301
    {
302
        if (!$retry) {
303
            $this->deleteJob($job);
304
        }
305
306
        return $retry;
307
    }
308
309
    protected function stallableSave(StallableJob $job)
310
    {
311
        // Generate crc hash for the job
312
        $hashValues = array(get_class($job), $job->getMethod(), $job->getWorkerName(), $job->getArgs());
313
        $crcHash = hash('sha256', serialize($hashValues));
314
        $job->setCrcHash($crcHash);
315
        $objectManager = $this->getObjectManager();
316
317
        if (true === $job->getBatch()) {
318
            $oldJob = $this->updateNearestBatch($job);
319
            if ($oldJob) {
320
                return $oldJob;
321
            }
322
        }
323
324
        // Just save a new job
325
        $this->resetSaveOk(__FUNCTION__);
326
        $objectManager->persist($job);
327
        $objectManager->flush();
328
329
        return $job;
330
    }
331
332
    abstract protected function updateNearestBatch(Job $job);
333
334
    /**
335
     * @param string $objectName
336
     */
337
    abstract protected function stopIdGenerator($objectName);
338
339
    abstract protected function restoreIdGenerator($objectName);
340
341
    /**
342
     * @param array $criterion
343
     * @param int   $limit
344
     * @param int   $offset
345
     */
346
    private function resetJobsByCriterion(
347
        array $criterion,
348
        $limit,
349
        $offset
350
    ) {
351
        $objectManager = $this->getObjectManager();
352
        $this->resetSaveOk(__FUNCTION__);
353
        $objectName = $this->getJobClass();
354
        $archiveObjectName = $this->getJobArchiveClass();
355
        $jobRepository = $objectManager->getRepository($objectName);
356
        $jobArchiveRepository = $objectManager->getRepository($archiveObjectName);
357
        $className = $jobRepository->getClassName();
358
        $metadata = $objectManager->getClassMetadata($className);
359
        $this->stopIdGenerator($objectName);
360
        $identifierData = $metadata->getIdentifier();
361
        $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id';
362
        $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset);
363
        $countProcessed = 0;
364
365
        foreach ($results as $jobArchive) {
366
            $countProcessed += $this->resetArchiveJob($jobArchive);
367
        }
368
        $objectManager->flush();
369
370
        $this->restoreIdGenerator($objectName);
371
372
        return $countProcessed;
373
    }
374
375
    protected function resetSaveOk($function)
376
    {
377
    }
378
379
    /**
380
     * @param null $workerName
381
     * @param null $methodName
382
     * @param bool $prioritize
383
     */
384
    abstract public function getJobQueryBuilder($workerName = null, $methodName = null, $prioritize = true);
385
386
    /**
387
     * @param StallableJob $jobArchive
388
     * @param $className
389
     *
390
     * @return int Number of jobs reset
391
     */
392
    protected function resetArchiveJob(StallableJob $jobArchive)
393
    {
394
        $objectManager = $this->getObjectManager();
395
        if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) {
0 ignored issues
show
Bug introduced by
It seems like $jobArchive->getMaxRetries() targeting Dtc\QueueBundle\Model\Re...bleJob::getMaxRetries() can also be of type integer; however, Dtc\QueueBundle\Manager\...ager::updateMaxStatus() does only seem to accept null, maybe add an additional type check?

This check looks at variables that are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
396
            $objectManager->persist($jobArchive);
397
398
            return 0;
399
        }
400
401
        /** @var StallableJob $job */
402
        $className = $this->getJobClass();
403
        $newJob = new $className();
404
        Util::copy($jobArchive, $newJob);
405
        $this->resetJob($newJob);
406
        $objectManager->remove($jobArchive);
407
        $this->flush();
408
409
        return 1;
410
    }
411
412
    protected function resetJob(RetryableJob $job)
413
    {
414
        if (!$job instanceof StallableJob) {
415
            throw new \InvalidArgumentException('$job should be instance of '.StallableJob::class);
416
        }
417
        $job->setStatus(BaseJob::STATUS_NEW);
418
        $job->setMessage(null);
419
        $job->setFinishedAt(null);
420
        $job->setStartedAt(null);
421
        $job->setElapsed(null);
422
        $job->setRetries($job->getRetries() + 1);
423
        $this->getObjectManager()->persist($job);
424
        $this->flush();
425
426
        return true;
427
    }
428
429
    abstract public function getWorkersAndMethods();
430
431
    abstract public function countLiveJobs($workerName = null, $methodName = null);
432
433
    abstract public function archiveAllJobs($workerName = null, $methodName = null, $progressCallback);
434
}
435