Passed
Push — master ( 3b1884...a67928 )
by Matthew
08:16
created

Loop::isFinishedEndTime()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3.0261

Importance

Changes 0
Metric Value
cc 3
eloc 6
c 0
b 0
f 0
nc 3
nop 2
dl 0
loc 11
ccs 6
cts 7
cp 0.8571
crap 3.0261
rs 10
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\DoctrineJobManager;
6
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
7
use Dtc\QueueBundle\Manager\JobManagerInterface;
8
use Dtc\QueueBundle\Manager\RunManager;
9
use Dtc\QueueBundle\Manager\WorkerManager;
10
use Dtc\QueueBundle\Model\BaseJob;
11
use Dtc\QueueBundle\Model\Job;
12
use Dtc\QueueBundle\Model\Run;
13
use Dtc\QueueBundle\Util\Util;
14
use Psr\Log\LoggerInterface;
0 ignored issues
show
Bug introduced by
The type Psr\Log\LoggerInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
15
use Symfony\Component\Console\Output\OutputInterface;
0 ignored issues
show
Bug introduced by
The type Symfony\Component\Console\Output\OutputInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
16
17
class Loop
18
{
19
    /** @var OutputInterface */
20
    protected $output;
21
22
    /** @var LoggerInterface */
23
    protected $logger;
24
25
    /** @var WorkerManager */
26
    protected $workerManager;
27
28
    /** @var JobManagerInterface */
29
    protected $jobManager;
30
31
    /** @var RunManager */
32
    protected $runManager;
33
34
    /** @var int */
35
    protected $processTimeout;
36
37
    /** @var Run */
38
    protected $lastRun;
39
40 3
    public function __construct(
41
        WorkerManager $workerManager,
42
        JobManagerInterface $jobManager,
43
        RunManager $runManager
44
    ) {
45 3
        $this->workerManager = $workerManager;
46 3
        $this->jobManager = $jobManager;
47 3
        $this->runManager = $runManager;
48 3
    }
49
50
    /**
51
     * @return Run|null
52
     */
53 3
    public function getLastRun()
54
    {
55 3
        return $this->lastRun;
56
    }
57
58
    /**
59
     * @return int
60
     */
61
    public function getProcessTimeout()
62
    {
63
        return $this->processTimeout;
64
    }
65
66
    /**
67
     * @param int $processTimeout
68
     */
69
    public function setProcessTimeout($processTimeout)
70
    {
71
        $this->processTimeout = $processTimeout;
72
    }
73
74
    public function setLogger(LoggerInterface $logger)
75
    {
76
        $this->logger = $logger;
77
    }
78
79 1
    public function setOutput(OutputInterface $output)
80
    {
81 1
        $this->output = $output;
82 1
    }
83
84
    /**
85
     * @param float $start
86
     */
87 2
    public function runJobById($start, $jobId)
88
    {
89 2
        $run = $this->runManager->runStart($start, null, null, $this->processTimeout);
90 2
        $this->lastRun = $run;
91
92 2
        if (!$this->jobManager instanceof DoctrineJobManager) {
93 1
            throw new ClassNotSubclassException("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
94
        }
95
96
        /** @var Job $job */
97 1
        $job = $this->jobManager->getRepository()->find($jobId);
98 1
        if (!$job) {
0 ignored issues
show
introduced by
$job is of type Dtc\QueueBundle\Model\Job, thus it always evaluated to true.
Loading history...
99
            $this->log('error', "Job id is not found: {$jobId}");
100
            $this->runManager->runStop($run, $start);
101
102
            return 0;
103
        }
104
105 1
        $job = $this->workerManager->runJob($job);
106 1
        $this->reportJob($job);
107 1
        $run->setProcessed(1);
108 1
        $this->runManager->runStop($run, $start);
0 ignored issues
show
Bug introduced by
$start of type double is incompatible with the type integer|null expected by parameter $start of Dtc\QueueBundle\Manager\RunManager::runStop(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

108
        $this->runManager->runStop($run, /** @scrutinizer ignore-type */ $start);
Loading history...
109 1
        $this->log('info', 'Ended with 1 job processed over '.strval($run->getElapsed()).' seconds.');
110
111 1
        return 0;
112
    }
113
114
    /**
115
     * @param float    $start
116
     * @param int      $nanoSleep
117
     * @param int|null $maxCount
118
     * @param int|null $duration
119
     */
120 3
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
121
    {
122 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
123 3
        $this->workerManager->setLoggingFunc([$this, 'log']);
124 3
        $run = $this->runManager->runStart($start, $maxCount, $duration, $this->processTimeout);
125 3
        $this->lastRun = $run;
126
        try {
127 3
            $this->log('info', 'Starting up a new job...');
128
129 3
            $endTime = $this->getEndTime($run, $duration);
130 3
            $currentJob = 1;
131 3
            $noMoreJobsToRun = false;
132
            do {
133 3
                $job = $this->workerManager->run($workerName, $methodName, true, $run->getId());
134 3
                $this->runManager->recordHeartbeat($run, $start, $job);
135 3
                $this->runCurrentJob($run, $job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
136 3
            } while (!$this->isFinished($maxCount, $duration, $endTime, $currentJob, $noMoreJobsToRun));
137
        } catch (\Exception $e) {
138
            // Uncaught error: possibly with QueueBundle itself
139
            $this->log('critical', $e->getMessage(), ['trace' => $e->getTraceAsString()]);
140
        }
141 3
        $this->runManager->runStop($run, $start);
0 ignored issues
show
Bug introduced by
$start of type double is incompatible with the type integer|null expected by parameter $start of Dtc\QueueBundle\Manager\RunManager::runStop(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

141
        $this->runManager->runStop($run, /** @scrutinizer ignore-type */ $start);
Loading history...
142 3
        $this->log('info', 'Ended with '.$run->getProcessed().' job(s) processed over '.strval($run->getElapsed()).' seconds.');
143
144 3
        return 0;
145
    }
146
147
    /**
148
     * @param int      $nanoSleep
149
     * @param int|null $maxCount
150
     * @param int|null $duration
151
     *
152
     * @throws \InvalidArgumentException
153
     */
154 3
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
155
    {
156 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
157 3
        $duration = Util::validateIntNull('duration', $duration, 32);
158 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
159
160 3
        $this->validateNanoSleep($nanoSleep);
161 3
        $this->validateMaxCountDuration($maxCount, $duration);
162 3
    }
163
164
    /**
165
     * @param int|null $maxCount
166
     * @param int|null $duration
167
     *
168
     * @throws \InvalidArgumentException
169
     */
170 3
    protected function validateMaxCountDuration($maxCount, $duration)
171
    {
172 3
        if (0 === $maxCount && 0 === $duration) {
173 2
            throw new \InvalidArgumentException('maxCount and duration can not both be 0');
174
        }
175 3
        if (null === $maxCount && null === $duration) {
176
            throw new \InvalidArgumentException('maxCount and duration can not both be null');
177
        }
178 3
    }
179
180
    /**
181
     * @param int|null $nanoSleep
182
     *
183
     * @throws \InvalidArgumentException
184
     */
185 3
    protected function validateNanoSleep($nanoSleep)
186
    {
187 3
        if (null === $nanoSleep) {
188
            throw new \InvalidArgumentException("nanoSleep can't be null");
189
        }
190 3
    }
191
192
    /**
193
     * @param int|null $duration
194
     *
195
     * @return \DateTime|null
196
     */
197 3
    protected function getEndTime(Run $run, $duration)
198
    {
199 3
        $endTime = null;
200 3
        if (null !== $duration) {
201 2
            $interval = new \DateInterval("PT${duration}S");
202 2
            $endTime = clone $run->getStartedAt();
203 2
            $endTime->add($interval);
204
        }
205
206 3
        return $endTime;
207
    }
208
209
    /**
210
     * @param Run      $run
211
     * @param Job|null $job
212
     * @param bool     $noMoreJobsToRun
213
     * @param int      $currentJob
214
     * @param int|null $duration
215
     * @param int      $nanoSleep
216
     */
217 3
    protected function runCurrentJob($run, $job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
218
    {
219 3
        if (null !== $job) {
220 3
            $noMoreJobsToRun = false;
221 3
            $this->reportJob($job);
222 3
            $this->runManager->updateProcessed($run, $currentJob);
223 3
            ++$currentJob;
224
        } else {
225 3
            if (!$noMoreJobsToRun) {
226 3
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
227 3
                $noMoreJobsToRun = true;
228
            }
229 3
            if (null !== $duration) {
230 2
                if ($nanoSleep > 0) {
231 2
                    $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
232 2
                    time_nanosleep(0, $nanoSleepTime);
233
                }
234
            }
235
        }
236 3
    }
237
238
    /**
239
     * @param $maxCount
240
     * @param $duration
241
     * @param $processTimeout
242
     */
243 1
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
244
    {
245 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
246
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
247
            $processTimeout *= 2;
248
        }
249
250 1
        if (null === $maxCount && null === $duration) {
251 1
            $maxCount = 1;
252
        }
253 1
    }
254
255
    /**
256
     * Determine if the run loop is finished.
257
     *
258
     * @param int|null       $maxCount
259
     * @param int            $currentJob
260
     * @param \DateTime|null $endTime
261
     * @param bool           $noMoreJobsToRun
262
     *
263
     * @return bool
264
     */
265 3
    protected function isFinished($maxCount, $duration, $endTime, $currentJob, $noMoreJobsToRun)
266
    {
267 3
        if (null === $maxCount) {
268 2
            return $this->isFinishedEndTime($duration, $endTime);
0 ignored issues
show
Bug introduced by
It seems like $endTime can also be of type null; however, parameter $endTime of Dtc\QueueBundle\Run\Loop::isFinishedEndTime() does only seem to accept DateTime, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

268
            return $this->isFinishedEndTime($duration, /** @scrutinizer ignore-type */ $endTime);
Loading history...
269
        }
270 3
        if ($currentJob <= $maxCount) {
271 3
            return $this->isFinishedJobs($duration, $endTime, $noMoreJobsToRun);
272
        }
273
274 3
        return true;
275
    }
276
277
    /**
278
     * @param \DateTime|null $endTime
279
     * @param bool           $noMoreJobsToRun
280
     *
281
     * @return bool
282
     */
283 3
    protected function isFinishedJobs($duration, $endTime, $noMoreJobsToRun)
284
    {
285 3
        if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
286 3
            if ($noMoreJobsToRun) {
287 3
                return true;
288
            }
289
290 3
            return false;
291
        }
292
293
        return $this->isFinishedEndTime($duration, $endTime);
294
    }
295
296
    /**
297
     * @return bool
298
     */
299 2
    protected function isFinishedEndTime($duration, \DateTime $endTime)
300
    {
301 2
        if (0 === $duration) {
302
            return false;
303
        }
304 2
        $now = Util::getMicrotimeDateTime();
305 2
        if ($endTime > $now) {
306 2
            return false;
307
        }
308
309 2
        return true;
310
    }
311
312 3
    protected function reportJob(Job $job)
313
    {
314 3
        if (BaseJob::STATUS_EXCEPTION == $job->getStatus()) {
315
            $message = "Exception with job id: {$job->getId()}\n".$job->getMessage();
316
            $this->log('error', $message);
317
        }
318
319 3
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
320 3
        $this->log('info', $message);
321 3
    }
322
323
    /**
324
     * @param string $level
325
     */
326 3
    public function log($level, $msg, array $context = [])
327
    {
328 3
        if ($this->logger) {
329
            $this->logger->$level($msg, $context);
330
331
            return;
332
        }
333
334 3
        if ($this->output) {
335 1
            $date = \Dtc\QueueBundle\Util\Util::getMicrotimeDateTime();
336 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
337 1
            if (!empty($context)) {
338 1
                $this->output->write(print_r($context, true));
339
            }
340 1
            $this->output->writeln('');
341
        }
342 3
    }
343
}
344