Completed
Push — master ( 4109b7...859c70 )
by Matthew
08:07 queued 02:26
created

Loop::runLoop()   B

Complexity

Conditions 3
Paths 8

Size

Total Lines 24
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 3.0021

Importance

Changes 0
Metric Value
dl 0
loc 24
ccs 15
cts 16
cp 0.9375
rs 8.9713
c 0
b 0
f 0
cc 3
eloc 18
nc 8
nop 6
crap 3.0021
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Doctrine\BaseRunManager;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\JobManagerInterface;
10
use Dtc\QueueBundle\Model\Run;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\WorkerManager;
13
use Dtc\QueueBundle\Util\Util;
14
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
15
use Psr\Log\LoggerInterface;
16
use Symfony\Component\Console\Output\OutputInterface;
17
18
class Loop
19
{
20
    /** @var Run $run */
21
    protected $run;
22
23
    /** @var OutputInterface */
24
    protected $output;
25
26
    /** @var LoggerInterface */
27
    protected $logger;
28
29
    /** @var WorkerManager */
30
    protected $workerManager;
31
32
    /** @var JobManagerInterface */
33
    protected $jobManager;
34
35
    /** @var RunManager */
36
    protected $runManager;
37
38
    /** @var int */
39
    protected $processTimeout;
40
41 1
    public function __construct(
42
        WorkerManager $workerManager,
43
        JobManagerInterface $jobManager,
44
        RunManager $runManager)
45
    {
46 1
        $this->workerManager = $workerManager;
47 1
        $this->jobManager = $jobManager;
48 1
        $this->runManager = $runManager;
49 1
    }
50
51
    /**
52
     * @return int
53
     */
54
    public function getProcessTimeout()
55
    {
56
        return $this->processTimeout;
57
    }
58
59
    /**
60
     * @param int $processTimeout
61
     */
62
    public function setProcessTimeout(int $processTimeout)
63
    {
64
        $this->processTimeout = $processTimeout;
65
    }
66
67
    public function setLogger(LoggerInterface $logger)
68
    {
69
        $this->logger = $logger;
70
    }
71
72 1
    public function setOutput(OutputInterface $output)
73
    {
74 1
        $this->output = $output;
75 1
    }
76
77
    /**
78
     * The current (last) run object.
79
     *
80
     * @return Run|null
81
     */
82 1
    public function getRun()
83
    {
84 1
        return $this->run;
85
    }
86
87
    /**
88
     * @param float $start
89
     */
90
    public function runJobById($start, $jobId)
91
    {
92
        $this->runStart($start);
93
94
        if (!$this->jobManager instanceof BaseJobManager) {
95
            throw new ClassNotSubclassException("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
96
        }
97
98
        /** @var Job $job */
99
        $job = $this->jobManager->getRepository()->find($jobId);
100
        if (!$job) {
101
            $this->log('error', "Job id is not found: {$jobId}");
102
            $this->runStop($start);
103
104
            return;
105
        }
106
107
        $job = $this->workerManager->runJob($job);
108
        $this->reportJob($job);
109
        $this->run->setProcessed(1);
110
        $this->runStop($start);
111
112
        return;
113
    }
114
115
    /**
116
     * @param float    $start
117
     * @param int      $nanoSleep
118
     * @param null|int $maxCount
119
     * @param null|int $duration
120
     */
121 1
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
122
    {
123 1
        $this->checkParameters($nanoSleep, $maxCount, $duration);
124 1
        $this->workerManager->setLoggingFunc([$this, 'log']);
125 1
        $this->runStart($start, $maxCount, $duration);
126
        try {
127 1
            $this->log('info', 'Staring up a new job...');
128
129 1
            $endTime = $this->getEndTime($duration);
130 1
            $currentJob = 1;
131 1
            $noMoreJobsToRun = false;
132
            do {
133 1
                $job = $this->workerManager->run($workerName, $methodName, true, $this->run->getId());
134 1
                $this->recordHeartbeat($start, $job);
135 1
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
136 1
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
137 1
        } catch (\Exception $e) {
138
            // Uncaught error: possibly with QueueBundle itself
139
            $this->log('critical', $e->getMessage(), $e->getTrace());
140
        }
141 1
        $this->runStop($start);
142
143 1
        return 0;
144
    }
145
146
    /**
147
     * @param int      $nanoSleep
148
     * @param null|int $maxCount
149
     * @param null|int $duration
150
     *
151
     * @throws \InvalidArgumentException
152
     */
153 1
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
154
    {
155 1
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
156 1
        $duration = Util::validateIntNull('duration', $duration, 32);
157 1
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
158
159 1
        $this->validateNanoSleep($nanoSleep);
160 1
        $this->validateMaxCountDuration($maxCount, $duration);
161 1
    }
162
163
    /**
164
     * @param int|null $maxCount
165
     * @param int|null $duration
166
     *
167
     * @throws \InvalidArgumentException
168
     */
169 1
    protected function validateMaxCountDuration($maxCount, $duration)
170
    {
171 1
        if (0 === $maxCount && 0 === $duration) {
172
            throw new \InvalidArgumentException('maxCount and duration can not both be 0');
173
        }
174 1
        if (null === $maxCount && null === $duration) {
175
            throw new \InvalidArgumentException('maxCount and duration can not both be null');
176
        }
177 1
    }
178
179
    /**
180
     * @param int|null $nanoSleep
181
     *
182
     * @throws \InvalidArgumentException
183
     */
184 1
    protected function validateNanoSleep($nanoSleep)
185
    {
186 1
        if (null === $nanoSleep) {
187
            throw new \InvalidArgumentException("nanoSleep can't be null");
188
        }
189 1
    }
190
191
    /**
192
     * @param int|null $duration
193
     *
194
     * @return null|\DateTime
195
     */
196 1
    protected function getEndTime($duration)
197
    {
198 1
        $endTime = null;
199 1
        if (null !== $duration) {
200 1
            $interval = new \DateInterval("PT${duration}S");
201 1
            $endTime = clone $this->run->getStartedAt();
202 1
            $endTime->add($interval);
203 1
        }
204
205
        return $endTime;
206
    }
207
208
    /**
209
     * @param Job|null $job
210
     * @param bool     $noMoreJobsToRun
211
     * @param int      $currentJob
212
     * @param int|null $duration
213
     * @param int      $nanoSleep
214
     */
215
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
216
    {
217 1
        if (null !== $job) {
218 1
            $noMoreJobsToRun = false;
219 1
            $this->reportJob($job);
220 1
            $this->updateProcessed($currentJob);
221 1
            ++$currentJob;
222 1
        } else {
223 1
            if (!$noMoreJobsToRun) {
224 1
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
225 1
                $noMoreJobsToRun = true;
226 1
            }
227 1
            if (null !== $duration) {
228 1
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
229 1
                time_nanosleep(0, $nanoSleepTime);
230 1
            }
231
        }
232 1
    }
233
234
    /**
235
     * @param $maxCount
236
     * @param $duration
237
     * @param $processTimeout
238
     */
239
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
240
    {
241 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
242
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
243
            $processTimeout *= 2;
244
        }
245
246 1
        if (null === $maxCount && null === $duration) {
247 1
            $maxCount = 1;
248 1
        }
249 1
    }
250
251
    /**
252
     * Determine if the run loop is finished.
253
     *
254
     * @param int|null       $maxCount
255
     * @param int            $currentJob
256
     * @param \DateTime|null $endTime
257
     * @param bool           $noMoreJobsToRun
258
     *
259
     * @return bool
260
     */
261
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
262
    {
263 1
        if (null === $maxCount) {
264 1
            return $this->isFinishedEndTime($endTime);
0 ignored issues
show
Bug introduced by
It seems like $endTime defined by parameter $endTime on line 261 can be null; however, Dtc\QueueBundle\Run\Loop::isFinishedEndTime() does not accept null, maybe add an additional type check?

It seems like you allow that null is being passed for a parameter, however the function which is called does not seem to accept null.

We recommend to add an additional type check (or disallow null for the parameter):

function notNullable(stdClass $x) { }

// Unsafe
function withoutCheck(stdClass $x = null) {
    notNullable($x);
}

// Safe - Alternative 1: Adding Additional Type-Check
function withCheck(stdClass $x = null) {
    if ($x instanceof stdClass) {
        notNullable($x);
    }
}

// Safe - Alternative 2: Changing Parameter
function withNonNullableParam(stdClass $x) {
    notNullable($x);
}
Loading history...
265
        }
266 1
        if ($currentJob <= $maxCount) {
267 1
            return $this->isFinishedJobs($endTime, $noMoreJobsToRun);
268
        }
269
270 1
        return true;
271
    }
272
273
    /**
274
     * @param \DateTime|null $endTime
275
     * @param bool           $noMoreJobsToRun
276
     *
277
     * @return bool
278
     */
279
    protected function isFinishedJobs($endTime, $noMoreJobsToRun)
280
    {
281 1
        if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
282 1
            if ($noMoreJobsToRun) {
283 1
                return true;
284
            }
285
286 1
            return false;
287
        }
288
289
        return $this->isFinishedEndTime($endTime);
290
    }
291
292
    /**
293
     * @param \DateTime $endTime
294
     *
295
     * @return bool
296
     */
297
    protected function isFinishedEndTime(\DateTime $endTime)
298
    {
299 1
        $now = new \DateTime();
300 1
        if ($endTime > $now) {
301 1
            return false;
302
        }
303
304 1
        return true;
305
    }
306
307
    /**
308
     * @param float    $start
309
     * @param Job|null $job
310
     */
311
    protected function recordHeartbeat($start, Job $job = null)
312
    {
313 1
        $jobId = null;
314 1
        if (null !== $job) {
315 1
            $jobId = $job->getId();
316 1
        }
317
318 1
        $this->run->setLastHeartbeatAt(new \DateTime());
319 1
        $this->run->setCurrentJobId($jobId);
320 1
        $this->run->setElapsed(microtime(true) - $start);
321 1
        $this->persistRun();
322 1
    }
323
324
    protected function persistRun($action = 'persist')
325
    {
326 1
        if ($this->runManager instanceof BaseRunManager) {
327 1
            $objectManager = $this->runManager->getObjectManager();
328 1
            $objectManager->$action($this->run);
329 1
            $objectManager->flush();
330 1
        }
331 1
    }
332
333
    /**
334
     * @param int $count
335
     */
336
    protected function updateProcessed($count)
337
    {
338 1
        $this->run->setProcessed($count);
339 1
        $this->persistRun();
340 1
    }
341
342
    /**
343
     * Sets up the runManager (document / entity persister) if appropriate.
344
     *
345
     * @param float    $start
346
     * @param int|null $maxCount
347
     * @param int|null $duration
348
     */
349
    protected function runStart($start, $maxCount = null, $duration = null)
350
    {
351 1
        $runClass = $this->runManager->getRunClass();
352 1
        $this->run = new $runClass();
353 1
        $startDate = \DateTime::createFromFormat('U.u', $start);
354 1
        $this->run->setLastHeartbeatAt($startDate);
355 1
        $this->run->setStartedAt($startDate);
356 1
        if (null !== $maxCount) {
357 1
            $this->run->setMaxCount($maxCount);
358 1
        }
359 1
        if (null !== $duration) {
360 1
            $this->run->setDuration($duration);
361 1
        }
362 1
        $this->run->setHostname(gethostname());
363 1
        $this->run->setPid(getmypid());
364 1
        $this->run->setProcessed(0);
365 1
        $this->run->setProcessTimeout($this->processTimeout);
366 1
        $this->persistRun();
367 1
    }
368
369
    /**
370
     * @param int|null $start
371
     */
372
    protected function runStop($start)
373
    {
374 1
        $end = microtime(true);
375 1
        $endedTime = \DateTime::createFromFormat('U.u', $end);
376 1
        if ($endedTime) {
377 1
            $this->run->setEndedAt($endedTime);
378 1
        }
379 1
        $this->run->setElapsed($end - $start);
380 1
        $this->persistRun('remove');
381 1
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
382 1
    }
383
384
    /**
385
     * @param Job $job
386
     */
387
    protected function reportJob(Job $job)
388
    {
389 1
        $this->runManager->recordJobRun($job);
390
391 1
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
392
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
393
            $this->log('error', $message);
394
        }
395
396 1
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
397 1
        $this->log('info', $message);
398 1
    }
399
400
    /**
401
     * @param string $level
402
     */
403
    public function log($level, $msg, array $context = [])
404
    {
405 1
        if ($this->logger) {
406
            $this->logger->$level($msg, $context);
407
408
            return;
409
        }
410
411 1
        if ($this->output) {
412 1
            $date = new \DateTime();
413 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
414 1
            if (!empty($context)) {
415 1
                $this->output->write(print_r($context, true));
416 1
            }
417 1
            $this->output->writeln('');
418 1
        }
419 1
    }
420
}
421