Completed
Push — master ( 859c70...b3ba31 )
by Matthew
06:19
created

Loop::getEndTime()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 7
nc 2
nop 1
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Doctrine\BaseRunManager;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\JobManagerInterface;
10
use Dtc\QueueBundle\Model\Run;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\WorkerManager;
13
use Dtc\QueueBundle\Util\Util;
14
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
15
use Psr\Log\LoggerInterface;
16
use Symfony\Component\Console\Output\OutputInterface;
17
18
class Loop
19
{
20
    /** @var Run $run */
21
    protected $run;
22
23
    /** @var OutputInterface */
24
    protected $output;
25
26
    /** @var LoggerInterface */
27
    protected $logger;
28
29
    /** @var WorkerManager */
30
    protected $workerManager;
31
32
    /** @var JobManagerInterface */
33
    protected $jobManager;
34
35
    /** @var RunManager */
36
    protected $runManager;
37
38
    /** @var int */
39
    protected $processTimeout;
40
41 3
    public function __construct(
42
        WorkerManager $workerManager,
43
        JobManagerInterface $jobManager,
44
        RunManager $runManager)
45
    {
46 3
        $this->workerManager = $workerManager;
47 3
        $this->jobManager = $jobManager;
48 3
        $this->runManager = $runManager;
49 3
    }
50
51
    /**
52
     * @return int
53
     */
54
    public function getProcessTimeout()
55
    {
56
        return $this->processTimeout;
57
    }
58
59
    /**
60
     * @param int $processTimeout
61
     */
62
    public function setProcessTimeout(int $processTimeout)
63
    {
64
        $this->processTimeout = $processTimeout;
65
    }
66
67
    public function setLogger(LoggerInterface $logger)
68
    {
69
        $this->logger = $logger;
70
    }
71
72 1
    public function setOutput(OutputInterface $output)
73
    {
74 1
        $this->output = $output;
75 1
    }
76
77
    /**
78
     * The current (last) run object.
79
     *
80
     * @return Run|null
81
     */
82 3
    public function getRun()
83
    {
84 3
        return $this->run;
85
    }
86
87
    /**
88
     * @param float $start
89
     */
90 2
    public function runJobById($start, $jobId)
91
    {
92 2
        $this->runStart($start);
93
94 2
        if (!$this->jobManager instanceof BaseJobManager) {
95 1
            throw new ClassNotSubclassException("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
96
        }
97
98
        /** @var Job $job */
99 1
        $job = $this->jobManager->getRepository()->find($jobId);
100 1
        if (!$job) {
101
            $this->log('error', "Job id is not found: {$jobId}");
102
            $this->runStop($start);
103
104
            return;
105
        }
106
107 1
        $job = $this->workerManager->runJob($job);
108 1
        $this->reportJob($job);
109 1
        $this->run->setProcessed(1);
110 1
        $this->runStop($start);
111
112 1
        return;
113
    }
114
115
    /**
116
     * @param float    $start
117
     * @param int      $nanoSleep
118
     * @param null|int $maxCount
119
     * @param null|int $duration
120
     */
121 3
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
122
    {
123 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
124 3
        $this->workerManager->setLoggingFunc([$this, 'log']);
125 3
        $this->runStart($start, $maxCount, $duration);
126
        try {
127 3
            $this->log('info', 'Staring up a new job...');
128
129 3
            $endTime = $this->getEndTime($duration);
130 3
            $currentJob = 1;
131 3
            $noMoreJobsToRun = false;
132
            do {
133 3
                $job = $this->workerManager->run($workerName, $methodName, true, $this->run->getId());
134 3
                $this->recordHeartbeat($start, $job);
135 3
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
136 3
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
137
        } catch (\Exception $e) {
138
            // Uncaught error: possibly with QueueBundle itself
139
            $this->log('critical', $e->getMessage(), $e->getTrace());
140
        }
141 3
        $this->runStop($start);
142
143 3
        return 0;
144
    }
145
146
    /**
147
     * @param int      $nanoSleep
148
     * @param null|int $maxCount
149
     * @param null|int $duration
150
     *
151
     * @throws \InvalidArgumentException
152
     */
153 3
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
154
    {
155 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
156 3
        $duration = Util::validateIntNull('duration', $duration, 32);
157 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
158
159 3
        $this->validateNanoSleep($nanoSleep);
160 3
        $this->validateMaxCountDuration($maxCount, $duration);
161 3
    }
162
163
    /**
164
     * @param int|null $maxCount
165
     * @param int|null $duration
166
     *
167
     * @throws \InvalidArgumentException
168
     */
169 3
    protected function validateMaxCountDuration($maxCount, $duration)
170
    {
171 3
        if (0 === $maxCount && 0 === $duration) {
172
            throw new \InvalidArgumentException('maxCount and duration can not both be 0');
173
        }
174 3
        if (null === $maxCount && null === $duration) {
175
            throw new \InvalidArgumentException('maxCount and duration can not both be null');
176
        }
177 3
    }
178
179
    /**
180
     * @param int|null $nanoSleep
181
     *
182
     * @throws \InvalidArgumentException
183
     */
184 3
    protected function validateNanoSleep($nanoSleep)
185
    {
186 3
        if (null === $nanoSleep) {
187
            throw new \InvalidArgumentException("nanoSleep can't be null");
188
        }
189 3
    }
190
191
    /**
192
     * @param int|null $duration
193
     *
194
     * @return null|\DateTime
195
     */
196 3
    protected function getEndTime($duration)
197
    {
198 3
        $endTime = null;
199 3
        if (null !== $duration) {
200 2
            $interval = new \DateInterval("PT${duration}S");
201 2
            $endTime = clone $this->run->getStartedAt();
202 2
            $endTime->add($interval);
203
        }
204
205
        return $endTime;
206
    }
207
208
    /**
209
     * @param Job|null $job
210
     * @param bool     $noMoreJobsToRun
211
     * @param int      $currentJob
212
     * @param int|null $duration
213
     * @param int      $nanoSleep
214
     */
215
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
216
    {
217 3
        if (null !== $job) {
218 3
            $noMoreJobsToRun = false;
219 3
            $this->reportJob($job);
220 3
            $this->updateProcessed($currentJob);
221 3
            ++$currentJob;
222
        } else {
223 3
            if (!$noMoreJobsToRun) {
224 3
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
225 3
                $noMoreJobsToRun = true;
226
            }
227 3
            if (null !== $duration) {
228 2
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
229 2
                time_nanosleep(0, $nanoSleepTime);
230
            }
231
        }
232 3
    }
233
234
    /**
235
     * @param $maxCount
236
     * @param $duration
237
     * @param $processTimeout
238
     */
239
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
240
    {
241 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
242
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
243
            $processTimeout *= 2;
244
        }
245
246 1
        if (null === $maxCount && null === $duration) {
247 1
            $maxCount = 1;
248
        }
249 1
    }
250
251
    /**
252
     * Determine if the run loop is finished.
253
     *
254
     * @param int|null       $maxCount
255
     * @param int            $currentJob
256
     * @param \DateTime|null $endTime
257
     * @param bool           $noMoreJobsToRun
258
     *
259
     * @return bool
260
     */
261
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
262
    {
263 3
        if (null === $maxCount) {
264 2
            return $this->isFinishedEndTime($endTime);
0 ignored issues
show
Bug introduced by
It seems like $endTime defined by parameter $endTime on line 261 can be null; however, Dtc\QueueBundle\Run\Loop::isFinishedEndTime() does not accept null, maybe add an additional type check?

It seems like you allow that null is being passed for a parameter, however the function which is called does not seem to accept null.

We recommend to add an additional type check (or disallow null for the parameter):

function notNullable(stdClass $x) { }

// Unsafe
function withoutCheck(stdClass $x = null) {
    notNullable($x);
}

// Safe - Alternative 1: Adding Additional Type-Check
function withCheck(stdClass $x = null) {
    if ($x instanceof stdClass) {
        notNullable($x);
    }
}

// Safe - Alternative 2: Changing Parameter
function withNonNullableParam(stdClass $x) {
    notNullable($x);
}
Loading history...
265
        }
266 3
        if ($currentJob <= $maxCount) {
267 3
            return $this->isFinishedJobs($endTime, $noMoreJobsToRun);
268
        }
269
270 3
        return true;
271
    }
272
273
    /**
274
     * @param \DateTime|null $endTime
275
     * @param bool           $noMoreJobsToRun
276
     *
277
     * @return bool
278
     */
279
    protected function isFinishedJobs($endTime, $noMoreJobsToRun)
280
    {
281 3
        if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
282 3
            if ($noMoreJobsToRun) {
283 3
                return true;
284
            }
285
286 3
            return false;
287
        }
288
289
        return $this->isFinishedEndTime($endTime);
290
    }
291
292
    /**
293
     * @param \DateTime $endTime
294
     *
295
     * @return bool
296
     */
297
    protected function isFinishedEndTime(\DateTime $endTime)
298
    {
299 2
        $now = new \DateTime();
300 2
        if ($endTime > $now) {
301 2
            return false;
302
        }
303
304 2
        return true;
305
    }
306
307
    /**
308
     * @param float    $start
309
     * @param Job|null $job
310
     */
311
    protected function recordHeartbeat($start, Job $job = null)
312
    {
313 3
        $jobId = null;
314 3
        if (null !== $job) {
315 3
            $jobId = $job->getId();
316
        }
317
318 3
        $this->run->setLastHeartbeatAt(new \DateTime());
319 3
        $this->run->setCurrentJobId($jobId);
320 3
        $this->run->setElapsed(microtime(true) - $start);
321 3
        $this->persistRun();
322 3
    }
323
324
    protected function persistRun($action = 'persist')
325
    {
326 3
        if ($this->runManager instanceof BaseRunManager) {
327 2
            $objectManager = $this->runManager->getObjectManager();
328 2
            $objectManager->$action($this->run);
329 2
            $objectManager->flush();
330
        }
331 3
    }
332
333
    /**
334
     * @param int $count
335
     */
336
    protected function updateProcessed($count)
337
    {
338 3
        $this->run->setProcessed($count);
339 3
        $this->persistRun();
340 3
    }
341
342
    /**
343
     * Sets up the runManager (document / entity persister) if appropriate.
344
     *
345
     * @param float    $start
346
     * @param int|null $maxCount
347
     * @param int|null $duration
348
     */
349
    protected function runStart($start, $maxCount = null, $duration = null)
350
    {
351 3
        $runClass = $this->runManager->getRunClass();
352 3
        $this->run = new $runClass();
353 3
        $startDate = \DateTime::createFromFormat('U.u', $start);
354 3
        $this->run->setLastHeartbeatAt($startDate);
355 3
        $this->run->setStartedAt($startDate);
356 3
        if (null !== $maxCount) {
357 3
            $this->run->setMaxCount($maxCount);
358
        }
359 3
        if (null !== $duration) {
360 2
            $this->run->setDuration($duration);
361
        }
362 3
        $this->run->setHostname(gethostname());
363 3
        $this->run->setPid(getmypid());
364 3
        $this->run->setProcessed(0);
365 3
        $this->run->setProcessTimeout($this->processTimeout);
366 3
        $this->persistRun();
367 3
    }
368
369
    /**
370
     * @param int|null $start
371
     */
372
    protected function runStop($start)
373
    {
374 3
        $end = microtime(true);
375 3
        $endedTime = \DateTime::createFromFormat('U.u', $end);
376 3
        if ($endedTime) {
377 3
            $this->run->setEndedAt($endedTime);
378
        }
379 3
        $this->run->setElapsed($end - $start);
380 3
        $this->persistRun('remove');
381 3
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
382 3
    }
383
384
    /**
385
     * @param Job $job
386
     */
387
    protected function reportJob(Job $job)
388
    {
389 3
        $this->runManager->recordJobRun($job);
390
391 3
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
392
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
393
            $this->log('error', $message);
394
        }
395
396 3
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
397 3
        $this->log('info', $message);
398 3
    }
399
400
    /**
401
     * @param string $level
402
     */
403
    public function log($level, $msg, array $context = [])
404
    {
405 3
        if ($this->logger) {
406
            $this->logger->$level($msg, $context);
407
408
            return;
409
        }
410
411 3
        if ($this->output) {
412 1
            $date = new \DateTime();
413 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
414 1
            if (!empty($context)) {
415 1
                $this->output->write(print_r($context, true));
416
            }
417 1
            $this->output->writeln('');
418
        }
419 3
    }
420
}
421