Loop   B
last analyzed

Complexity

Total Complexity 49

Size/Duplication

Total Lines 322
Duplicated Lines 0 %

Test Coverage

Coverage 82.58%

Importance

Changes 3
Bugs 1 Features 0
Metric Value
eloc 114
c 3
b 1
f 0
dl 0
loc 322
ccs 109
cts 132
cp 0.8258
rs 8.48
wmc 49

19 Methods

Rating   Name   Duplication   Size   Complexity  
A getLastRun() 0 3 1
A setProcessTimeout() 0 3 1
A getProcessTimeout() 0 3 1
A setOutput() 0 3 1
A __construct() 0 8 1
A setLogger() 0 3 1
A validateNanoSleep() 0 4 2
A runCurrentJob() 0 16 6
A runLoop() 0 25 3
A isFinishedEndTime() 0 11 3
A checkParameters() 0 8 1
A checkMaxCountDuration() 0 9 6
A validateMaxCountDuration() 0 7 5
A isFinishedJobs() 0 11 3
A getEndTime() 0 10 2
A log() 0 15 4
A reportJob() 0 9 2
A isFinished() 0 10 3
A runJobById() 0 23 3

How to fix   Complexity   

Complex Class

Complex classes like Loop often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Loop, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
6
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
7
use Dtc\QueueBundle\Manager\JobManagerInterface;
8
use Dtc\QueueBundle\Manager\RunManager;
9
use Dtc\QueueBundle\Manager\WorkerManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\Job;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
use Psr\Log\LoggerInterface;
0 ignored issues
show
Bug introduced by
The type Psr\Log\LoggerInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
15
use Symfony\Component\Console\Output\OutputInterface;
0 ignored issues
show
Bug introduced by
The type Symfony\Component\Console\Output\OutputInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
16
17
class Loop
18
{
19
    /** @var OutputInterface */
20
    protected $output;
21
22
    /** @var LoggerInterface */
23
    protected $logger;
24
25
    /** @var WorkerManager */
26
    protected $workerManager;
27
28
    /** @var JobManagerInterface */
29
    protected $jobManager;
30
31
    /** @var RunManager */
32
    protected $runManager;
33
34
    /** @var int */
35
    protected $processTimeout;
36
37
    /** @var Run */
38
    protected $lastRun;
39
40 3
    public function __construct(
41
        WorkerManager $workerManager,
42
        JobManagerInterface $jobManager,
43
        RunManager $runManager
44
    ) {
45 3
        $this->workerManager = $workerManager;
46 3
        $this->jobManager = $jobManager;
47 3
        $this->runManager = $runManager;
48 3
    }
49
50
    /**
51
     * @return Run|null
52
     */
53 3
    public function getLastRun()
54
    {
55 3
        return $this->lastRun;
56
    }
57
58
    /**
59
     * @return int
60
     */
61
    public function getProcessTimeout()
62
    {
63
        return $this->processTimeout;
64
    }
65
66
    /**
67
     * @param int $processTimeout
68
     */
69
    public function setProcessTimeout($processTimeout)
70
    {
71
        $this->processTimeout = $processTimeout;
72
    }
73
74
    public function setLogger(LoggerInterface $logger)
75
    {
76
        $this->logger = $logger;
77
    }
78
79 1
    public function setOutput(OutputInterface $output)
80
    {
81 1
        $this->output = $output;
82 1
    }
83
84
    /**
85
     * @param float $start
86
     */
87 2
    public function runJobById($start, $jobId): void
88
    {
89 2
        $run = $this->runManager->runStart($start, null, null, $this->processTimeout);
90 2
        $this->lastRun = $run;
91
92 2
        if (!$this->jobManager instanceof DoctrineJobManager) {
93 1
            throw new ClassNotSubclassException("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
94
        }
95
96
        /** @var Job $job */
97 1
        $job = $this->jobManager->getRepository()->find($jobId);
98 1
        if (!$job) {
0 ignored issues
show
introduced by
$job is of type Dtc\QueueBundle\Model\Job, thus it always evaluated to true.
Loading history...
99
            $this->log('error', "Job id is not found: {$jobId}");
100
            $this->runManager->runStop($run, $start);
101
102
            return;
103
        }
104
105 1
        $job = $this->workerManager->runJob($job);
106 1
        $this->reportJob($job);
107 1
        $run->setProcessed(1);
108 1
        $this->runManager->runStop($run, $start);
0 ignored issues
show
Bug introduced by
$start of type double is incompatible with the type integer|null expected by parameter $start of Dtc\QueueBundle\Manager\RunManager::runStop(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

108
        $this->runManager->runStop($run, /** @scrutinizer ignore-type */ $start);
Loading history...
109 1
        $this->log('info', 'Ended with 1 job processed over '.strval($run->getElapsed()).' seconds.');
110
    }
111 1
112
    /**
113
     * @param float    $start
114
     * @param int      $nanoSleep
115
     * @param int|null $maxCount
116
     * @param int|null $duration
117
     */
118
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
119
    {
120 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
121
        $this->workerManager->setLoggingFunc([$this, 'log']);
122 3
        $run = $this->runManager->runStart($start, $maxCount, $duration, $this->processTimeout);
123 3
        $this->lastRun = $run;
124 3
        try {
125 3
            $this->log('info', 'Starting up a new job...');
126
127 3
            $endTime = $this->getEndTime($run, $duration);
128
            $currentJob = 1;
129 3
            $noMoreJobsToRun = false;
130 3
            do {
131 3
                $job = $this->workerManager->run($workerName, $methodName, true, $run->getId());
132
                $this->runManager->recordHeartbeat($run, $start, $job);
133 3
                $this->runCurrentJob($run, $job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
134 3
            } while (!$this->isFinished($maxCount, $duration, $endTime, $currentJob, $noMoreJobsToRun));
135 3
        } catch (\Exception $e) {
136 3
            // Uncaught error: possibly with QueueBundle itself
137
            $this->log('critical', $e->getMessage(), ['trace' => $e->getTraceAsString()]);
138
        }
139
        $this->runManager->runStop($run, $start);
0 ignored issues
show
Bug introduced by
$start of type double is incompatible with the type integer|null expected by parameter $start of Dtc\QueueBundle\Manager\RunManager::runStop(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

139
        $this->runManager->runStop($run, /** @scrutinizer ignore-type */ $start);
Loading history...
140
        $this->log('info', 'Ended with '.$run->getProcessed().' job(s) processed over '.strval($run->getElapsed()).' seconds.');
141 3
142 3
        return 0;
143
    }
144 3
145
    /**
146
     * @param int      $nanoSleep
147
     * @param int|null $maxCount
148
     * @param int|null $duration
149
     *
150
     * @throws \InvalidArgumentException
151
     */
152
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
153
    {
154 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
155
        $duration = Util::validateIntNull('duration', $duration, 32);
156 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
157 3
158 3
        $this->validateNanoSleep($nanoSleep);
159
        $this->validateMaxCountDuration($maxCount, $duration);
160 3
    }
161 3
162 3
    /**
163
     * @param int|null $maxCount
164
     * @param int|null $duration
165
     *
166
     * @throws \InvalidArgumentException
167
     */
168
    protected function validateMaxCountDuration($maxCount, $duration)
169
    {
170 3
        if (0 === $maxCount && 0 === $duration) {
171
            throw new \InvalidArgumentException('maxCount and duration can not both be 0');
172 3
        }
173 2
        if (null === $maxCount && null === $duration) {
174
            throw new \InvalidArgumentException('maxCount and duration can not both be null');
175 3
        }
176
    }
177
178 3
    /**
179
     * @param int|null $nanoSleep
180
     *
181
     * @throws \InvalidArgumentException
182
     */
183
    protected function validateNanoSleep($nanoSleep)
184
    {
185 3
        if (null === $nanoSleep) {
186
            throw new \InvalidArgumentException("nanoSleep can't be null");
187 3
        }
188
    }
189
190 3
    /**
191
     * @param int|null $duration
192
     *
193
     * @return \DateTime|null
194
     */
195
    protected function getEndTime(Run $run, $duration)
196
    {
197 3
        $endTime = null;
198
        if (null !== $duration) {
199 3
            $interval = new \DateInterval("PT{$duration}S");
200 3
            $endTime = clone $run->getStartedAt();
201 2
            $endTime->add($interval);
202 2
        }
203 2
204
        return $endTime;
205
    }
206 3
207
    /**
208
     * @param Run      $run
209
     * @param Job|null $job
210
     * @param bool     $noMoreJobsToRun
211
     * @param int      $currentJob
212
     * @param int|null $duration
213
     * @param int      $nanoSleep
214
     */
215
    protected function runCurrentJob($run, $job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
216
    {
217 3
        if (null !== $job) {
218
            $noMoreJobsToRun = false;
219 3
            $this->reportJob($job);
220 3
            $this->runManager->updateProcessed($run, $currentJob);
221 3
            ++$currentJob;
222 3
        } else {
223 3
            if (!$noMoreJobsToRun) {
224
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
225 3
                $noMoreJobsToRun = true;
226 3
            }
227 3
            if (null !== $duration) {
228
                if ($nanoSleep > 0) {
229 3
                    $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
230 2
                    time_nanosleep(0, $nanoSleepTime);
231 2
                }
232 2
            }
233
        }
234
    }
235
236 3
    /**
237
     * @param $maxCount
238
     * @param $duration
239
     * @param $processTimeout
240
     */
241
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
242
    {
243 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
244
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
245 1
            $processTimeout *= 2;
246
        }
247
248
        if (null === $maxCount && null === $duration) {
249
            $maxCount = 1;
250 1
        }
251 1
    }
252
253 1
    /**
254
     * Determine if the run loop is finished.
255
     *
256
     * @param int|null       $maxCount
257
     * @param int            $currentJob
258
     * @param \DateTime|null $endTime
259
     * @param bool           $noMoreJobsToRun
260
     *
261
     * @return bool
262
     */
263
    protected function isFinished($maxCount, $duration, $endTime, $currentJob, $noMoreJobsToRun)
264
    {
265 3
        if (null === $maxCount) {
266
            return $this->isFinishedEndTime($duration, $endTime);
0 ignored issues
show
Bug introduced by
It seems like $endTime can also be of type null; however, parameter $endTime of Dtc\QueueBundle\Run\Loop::isFinishedEndTime() does only seem to accept DateTime, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

266
            return $this->isFinishedEndTime($duration, /** @scrutinizer ignore-type */ $endTime);
Loading history...
267 3
        }
268 2
        if ($currentJob <= $maxCount) {
269
            return $this->isFinishedJobs($duration, $endTime, $noMoreJobsToRun);
270 3
        }
271 3
272
        return true;
273
    }
274 3
275
    /**
276
     * @param \DateTime|null $endTime
277
     * @param bool           $noMoreJobsToRun
278
     *
279
     * @return bool
280
     */
281
    protected function isFinishedJobs($duration, $endTime, $noMoreJobsToRun)
282
    {
283 3
        if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
284
            if ($noMoreJobsToRun) {
285 3
                return true;
286 3
            }
287 3
288
            return false;
289
        }
290 3
291
        return $this->isFinishedEndTime($duration, $endTime);
292
    }
293
294
    /**
295
     * @return bool
296
     */
297
    protected function isFinishedEndTime($duration, \DateTime $endTime)
298
    {
299 2
        if (0 === $duration) {
300
            return false;
301 2
        }
302
        $now = Util::getMicrotimeDateTime();
303
        if ($endTime > $now) {
304 2
            return false;
305 2
        }
306 2
307
        return true;
308
    }
309 2
310
    protected function reportJob(Job $job)
311
    {
312 3
        if (BaseJob::STATUS_EXCEPTION == $job->getStatus()) {
313
            $message = "Exception with job id: {$job->getId()}\n".$job->getMessage();
314 3
            $this->log('error', $message);
315
        }
316
317
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
318
        $this->log('info', $message);
319 3
    }
320 3
321 3
    /**
322
     * @param string $level
323
     */
324
    public function log($level, $msg, array $context = [])
325
    {
326 3
        if ($this->logger) {
327
            $this->logger->$level($msg, $context);
328 3
329
            return;
330
        }
331
332
        if ($this->output) {
333
            $date = \Dtc\QueueBundle\Util\Util::getMicrotimeDateTime();
334 3
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
335 1
            if (!empty($context)) {
336 1
                $this->output->write(print_r($context, true));
337 1
            }
338 1
            $this->output->writeln('');
339
        }
340 1
    }
341
}
342