Completed
Push — master ( 1204eb...5574d8 )
by Matthew
05:57
created

Loop::getRun()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Doctrine\BaseRunManager;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\JobManagerInterface;
10
use Dtc\QueueBundle\Model\Run;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\WorkerManager;
13
use Dtc\QueueBundle\Util\Util;
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
class Loop
18
{
19
    /** @var Run $run */
20
    protected $run;
21
22
    /** @var OutputInterface */
23
    protected $output;
24
25
    /** @var LoggerInterface */
26
    protected $logger;
27
28
    /** @var WorkerManager */
29
    protected $workerManager;
30
31
    /** @var JobManagerInterface */
32
    protected $jobManager;
33
34
    /** @var RunManager */
35
    protected $runManager;
36
37
    /** @var int */
38
    protected $processTimeout;
39
40 1
    public function __construct(
41
        WorkerManager $workerManager,
42
        JobManagerInterface $jobManager,
43
        RunManager $runManager)
44
    {
45 1
        $this->workerManager = $workerManager;
46 1
        $this->jobManager = $jobManager;
47 1
        $this->runManager = $runManager;
48 1
    }
49
50
    /**
51
     * @return int
52
     */
53
    public function getProcessTimeout()
54
    {
55
        return $this->processTimeout;
56
    }
57
58
    /**
59
     * @param int $processTimeout
60
     */
61
    public function setProcessTimeout(int $processTimeout)
62
    {
63
        $this->processTimeout = $processTimeout;
64
    }
65
66
    public function setLogger(LoggerInterface $logger)
67
    {
68
        $this->logger = $logger;
69
    }
70
71 1
    public function setOutput(OutputInterface $output)
72
    {
73 1
        $this->output = $output;
74 1
    }
75
76
    /**
77
     * The current (last) run object.
78
     *
79
     * @return Run|null
80
     */
81 1
    public function getRun()
82
    {
83 1
        return $this->run;
84
    }
85
86
    /**
87
     * @param float $start
88
     */
89
    public function runJobById($start, $jobId)
90
    {
91
        $this->runStart($start);
92
93
        if (!$this->jobManager instanceof BaseJobManager) {
94
            throw new \Exception("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
95
        }
96
97
        /** @var Job $job */
98
        $job = $this->jobManager->getRepository()->find($jobId);
99
        if (!$job) {
100
            $this->log('error', "Job id is not found: {$jobId}");
101
            $this->runStop($start);
102
103
            return;
104
        }
105
106
        $job = $this->workerManager->runJob($job);
107
        $this->reportJob($job);
108
        $this->run->setProcessed(1);
109
        $this->runStop($start);
110
111
        return;
112
    }
113
114
    /**
115
     * @param float    $start
116
     * @param int      $nanoSleep
117
     * @param null|int $maxCount
118
     * @param null|int $duration
119
     */
120 1
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
121
    {
122 1
        $this->checkParameters($nanoSleep, $maxCount, $duration);
123 1
        $this->workerManager->setLoggingFunc([$this, 'log']);
124 1
        $this->runStart($start, $maxCount, $duration);
125
        try {
126 1
            $this->log('info', 'Staring up a new job...');
127
128 1
            $endTime = $this->getEndTime($duration);
129 1
            $currentJob = 1;
130 1
            $noMoreJobsToRun = false;
131
            do {
132 1
                $job = $this->workerManager->run($workerName, $methodName, true, $this->run->getId());
133 1
                $this->recordHeartbeat($start, $job);
134 1
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
135 1
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
136
        } catch (\Exception $e) {
137
            // Uncaught error: possibly with QueueBundle itself
138
            $this->log('critical', $e->getMessage(), $e->getTrace());
139
        }
140 1
        $this->runStop($start);
141
142 1
        return 0;
143
    }
144
145
    /**
146
     * @param int      $nanoSleep
147
     * @param null|int $maxCount
148
     * @param null|int $duration
149
     *
150
     * @throws \Exception
151
     */
152 1
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
153
    {
154 1
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
155 1
        $duration = Util::validateIntNull('duration', $duration, 32);
156 1
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
157
158 1
        $this->validateNanoSleep($nanoSleep);
159 1
        $this->validateMaxCountDuration($maxCount, $duration);
160 1
    }
161
162
    /**
163
     * @param int|null $maxCount
164
     * @param int|null $duration
165
     *
166
     * @throws \Exception
167
     */
168 1
    protected function validateMaxCountDuration($maxCount, $duration)
169
    {
170 1
        if (0 === $maxCount && 0 === $duration) {
171
            throw new \Exception('maxCount and duration can not both be 0');
172
        }
173 1
        if (null === $maxCount && null === $duration) {
174
            throw new \Exception('maxCount and duration can not both be null');
175
        }
176 1
    }
177
178
    /**
179
     * @param int|null $nanoSleep
180
     *
181
     * @throws \Exception
182
     */
183 1
    protected function validateNanoSleep($nanoSleep)
184
    {
185 1
        if (null === $nanoSleep) {
186
            throw new \Exception("nanoSleep can't be null");
187
        }
188 1
    }
189
190
    /**
191
     * @param int|null $duration
192
     *
193
     * @return null|\DateTime
194
     */
195 1
    protected function getEndTime($duration)
196
    {
197 1
        $endTime = null;
198 1
        if (null !== $duration) {
199 1
            $interval = new \DateInterval("PT${duration}S");
200 1
            $endTime = clone $this->run->getStartedAt();
201 1
            $endTime->add($interval);
202
        }
203
204
        return $endTime;
205
    }
206
207
    /**
208
     * @param Job|null $job
209
     * @param bool     $noMoreJobsToRun
210
     * @param int      $currentJob
211
     * @param int|null $duration
212
     * @param int      $nanoSleep
213
     */
214
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
215
    {
216 1
        if (null !== $job) {
217 1
            $noMoreJobsToRun = false;
218 1
            $this->reportJob($job);
219 1
            $this->updateProcessed($currentJob);
220 1
            ++$currentJob;
221
        } else {
222 1
            if (!$noMoreJobsToRun) {
223 1
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
224 1
                $noMoreJobsToRun = true;
225
            }
226 1
            if (null !== $duration) {
227 1
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
228 1
                time_nanosleep(0, $nanoSleepTime);
229
            }
230
        }
231 1
    }
232
233
    /**
234
     * @param $maxCount
235
     * @param $duration
236
     * @param $processTimeout
237
     */
238
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
239
    {
240 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
241
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
242
            $processTimeout *= 2;
243
        }
244
245 1
        if (null === $maxCount && null === $duration) {
246 1
            $maxCount = 1;
247
        }
248 1
    }
249
250
    /**
251
     * Determine if the run loop is finished.
252
     *
253
     * @param int|null       $maxCount
254
     * @param int            $currentJob
255
     * @param \DateTime|null $endTime
256
     * @param bool           $noMoreJobsToRun
257
     *
258
     * @return bool
259
     */
260
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
261
    {
262 1
        if (null === $maxCount) {
263 1
            return $this->isFinishedEndTime($endTime);
0 ignored issues
show
Bug introduced by
It seems like $endTime defined by parameter $endTime on line 260 can be null; however, Dtc\QueueBundle\Run\Loop::isFinishedEndTime() does not accept null, maybe add an additional type check?

It seems like you allow that null is being passed for a parameter, however the function which is called does not seem to accept null.

We recommend to add an additional type check (or disallow null for the parameter):

function notNullable(stdClass $x) { }

// Unsafe
function withoutCheck(stdClass $x = null) {
    notNullable($x);
}

// Safe - Alternative 1: Adding Additional Type-Check
function withCheck(stdClass $x = null) {
    if ($x instanceof stdClass) {
        notNullable($x);
    }
}

// Safe - Alternative 2: Changing Parameter
function withNonNullableParam(stdClass $x) {
    notNullable($x);
}
Loading history...
264
        }
265 1
        if ($currentJob <= $maxCount) {
266 1
            return $this->isFinishedJobs($endTime, $noMoreJobsToRun);
267
        }
268
269 1
        return true;
270
    }
271
272
    /**
273
     * @param \DateTime|null $endTime
274
     * @param bool           $noMoreJobsToRun
275
     *
276
     * @return bool
277
     */
278
    protected function isFinishedJobs($endTime, $noMoreJobsToRun)
279
    {
280 1
        if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
281 1
            if ($noMoreJobsToRun) {
282 1
                return true;
283
            }
284
285 1
            return false;
286
        }
287
288
        return $this->isFinishedEndTime($endTime);
289
    }
290
291
    /**
292
     * @param \DateTime $endTime
293
     *
294
     * @return bool
295
     */
296
    protected function isFinishedEndTime(\DateTime $endTime)
297
    {
298 1
        $now = new \DateTime();
299 1
        if ($endTime > $now) {
300 1
            return false;
301
        }
302
303 1
        return true;
304
    }
305
306
    /**
307
     * @param float    $start
308
     * @param Job|null $jobId
0 ignored issues
show
Bug introduced by
There is no parameter named $jobId. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
309
     */
310
    protected function recordHeartbeat($start, $job)
311
    {
312 1
        $jobId = null;
313 1
        if (null !== $job) {
314 1
            $jobId = $job->getId();
315
        }
316
317 1
        $this->run->setLastHeartbeatAt(new \DateTime());
318 1
        $this->run->setCurrentJobId($jobId);
319 1
        $this->run->setElapsed(microtime(true) - $start);
320 1
        $this->persistRun();
321 1
    }
322
323
    protected function persistRun($action = 'persist')
324
    {
325 1
        if ($this->runManager instanceof BaseRunManager) {
326 1
            $objectManager = $this->runManager->getObjectManager();
327 1
            $objectManager->$action($this->run);
328 1
            $objectManager->flush();
329
        }
330 1
    }
331
332
    /**
333
     * @param int $count
334
     */
335
    protected function updateProcessed($count)
336
    {
337 1
        $this->run->setProcessed($count);
338 1
        $this->persistRun();
339 1
    }
340
341
    /**
342
     * Sets up the runManager (document / entity persister) if appropriate.
343
     *
344
     * @param float    $start
345
     * @param int|null $maxCount
346
     * @param int|null $duration
347
     */
348
    protected function runStart($start, $maxCount = null, $duration = null)
349
    {
350 1
        $runClass = $this->runManager->getRunClass();
351 1
        $this->run = new $runClass();
352 1
        $startDate = \DateTime::createFromFormat('U.u', $start);
353 1
        $this->run->setLastHeartbeatAt($startDate);
354 1
        $this->run->setStartedAt($startDate);
355 1
        if (null !== $maxCount) {
356 1
            $this->run->setMaxCount($maxCount);
357
        }
358 1
        if (null !== $duration) {
359 1
            $this->run->setDuration($duration);
360
        }
361 1
        $this->run->setHostname(gethostname());
362 1
        $this->run->setPid(getmypid());
363 1
        $this->run->setProcessed(0);
364 1
        $this->run->setProcessTimeout($this->processTimeout);
365 1
        $this->persistRun();
366 1
    }
367
368
    /**
369
     * @param int|null $start
370
     */
371
    protected function runStop($start)
372
    {
373 1
        $end = microtime(true);
374 1
        $endedTime = \DateTime::createFromFormat('U.u', $end);
375 1
        if ($endedTime) {
376 1
            $this->run->setEndedAt($endedTime);
377
        }
378 1
        $this->run->setElapsed($end - $start);
379 1
        $this->persistRun('remove');
380 1
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
381 1
    }
382
383
    /**
384
     * @param Job $job
385
     */
386
    protected function reportJob(Job $job)
387
    {
388 1
        $this->runManager->recordJobRun($job);
389
390 1
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
391
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
392
            $this->log('error', $message);
393
        }
394
395 1
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
396 1
        $this->log('info', $message);
397 1
    }
398
399
    /**
400
     * @param string $level
401
     */
402
    public function log($level, $msg, array $context = [])
403
    {
404 1
        if ($this->logger) {
405
            $this->logger->$level($msg, $context);
406
407
            return;
408
        }
409
410 1
        if ($this->output) {
411 1
            $date = new \DateTime();
412 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
413 1
            if (!empty($context)) {
414 1
                $this->output->write(print_r($context, true));
415
            }
416 1
            $this->output->writeln('');
417
        }
418 1
    }
419
}
420