Completed
Push — master ( 17631a...07adb9 )
by Matthew
05:11
created

Loop::recordHeartbeat()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 4
cts 4
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Doctrine\BaseRunManager;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\JobManagerInterface;
10
use Dtc\QueueBundle\Model\Run;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\WorkerManager;
13
use Dtc\QueueBundle\Util\Util;
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
class Loop
18
{
19
    /** @var Run $run */
20
    protected $run;
21
22
    /** @var OutputInterface */
23
    protected $output;
24
25
    /** @var LoggerInterface */
26
    protected $logger;
27
28
    /** @var WorkerManager */
29
    protected $workerManager;
30
31
    /** @var JobManagerInterface */
32
    protected $jobManager;
33
34
    /** @var RunManager */
35
    protected $runManager;
36
37 3
    public function __construct(
38
        WorkerManager $workerManager,
39
        JobManagerInterface $jobManager,
40
        RunManager $runManager)
41
    {
42 3
        $this->workerManager = $workerManager;
43 3
        $this->jobManager = $jobManager;
44 3
        $this->runManager = $runManager;
45 3
    }
46
47
    public function setLogger(LoggerInterface $logger)
48
    {
49
        $this->logger = $logger;
50
    }
51
52 1
    public function setOutput(OutputInterface $output)
53
    {
54 1
        $this->output = $output;
55 1
    }
56
57
    /**
58
     * The current (last) run object.
59
     *
60
     * @return Run|null
61
     */
62 3
    public function getRun()
63
    {
64 3
        return $this->run;
65
    }
66
67
    /**
68
     * @param float $start
69
     */
70 2
    public function runJobById($start, $jobId)
71
    {
72 2
        $this->runStart($start);
73
74 2
        if (!$this->jobManager instanceof BaseJobManager) {
75 1
            throw new \Exception("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
76
        }
77
78
        /** @var Job $job */
79 1
        $job = $this->jobManager->getRepository()->find($jobId);
80 1
        if (!$job) {
81
            $this->log('error', "Job id is not found: {$jobId}");
82
            $this->runStop($start);
83
84
            return;
85
        }
86
87 1
        $job = $this->workerManager->runJob($job);
88 1
        $this->reportJob($job);
89 1
        $this->run->setProcessed(1);
90 1
        $this->runStop($start);
91
92 1
        return;
93
    }
94
95
    /**
96
     * @param float    $start
97
     * @param integer $nanoSleep
98
     * @param null|int $maxCount
99
     * @param null|int $duration
100
     */
101 3
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
102
    {
103 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
104 3
        $this->workerManager->setLoggingFunc([$this, 'log']);
105 3
        $this->runStart($start, $maxCount, $duration);
106
        try {
107 3
            $this->log('info', 'Staring up a new job...');
108
109 3
            $endTime = $this->getEndTime($duration);
110 3
            $currentJob = 1;
111 3
            $noMoreJobsToRun = false;
112
            do {
113 3
                $this->recordHeartbeat($start);
114 3
                $job = $this->workerManager->run($workerName, $methodName, true, $this->run->getId());
115 3
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
116 3
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
117
        } catch (\Exception $e) {
118
            // Uncaught error: possibly with QueueBundle itself
119
            $this->log('critical', $e->getMessage(), $e->getTrace());
120
        }
121 3
        $this->runStop($start);
122
123 3
        return 0;
124
    }
125
126
    /**
127
     * @param integer $nanoSleep
128
     * @param null|integer $maxCount
129
     * @param null|integer $duration
130
     *
131
     * @throws \Exception
132
     */
133 3
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
134
    {
135 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
136 3
        $duration = Util::validateIntNull('duration', $duration, 32);
137 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
138
139 3
        if (null === $nanoSleep) {
140
            throw new \Exception("nanoSleep can't be null");
141
        }
142 3
        if (0 === $maxCount && 0 === $duration) {
143
            throw new \Exception('maxCount and duration can not both be 0');
144
        }
145 3
        if (null === $maxCount && null === $duration) {
146
            throw new \Exception('maxCount and duration can not both be null');
147
        }
148 3
    }
149
150
    /**
151
     * @param int|null $duration
152
     *
153
     * @return null|\DateTime
154
     */
155 3
    protected function getEndTime($duration)
156
    {
157 3
        $endTime = null;
158 3
        if (null !== $duration) {
159 2
            $interval = new \DateInterval("PT${duration}S");
160 2
            $endTime = clone $this->run->getStartedAt();
161 2
            $endTime->add($interval);
162
        }
163
164
        return $endTime;
165
    }
166
167
    /**
168
     * @param Job|null $job
169
     * @param bool     $noMoreJobsToRun
170
     * @param int      $currentJob
171
     * @param int|null $duration
172
     * @param int      $nanoSleep
173
     */
174
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
175
    {
176 3
        if (null !== $job) {
177 3
            $noMoreJobsToRun = false;
178 3
            $this->reportJob($job);
179 3
            $this->updateProcessed($currentJob);
180 3
            ++$currentJob;
181
        } else {
182 3
            if (!$noMoreJobsToRun) {
183 3
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
184 3
                $noMoreJobsToRun = true;
185
            }
186 3
            if (null !== $duration) {
187 2
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
188 2
                time_nanosleep(0, $nanoSleepTime);
189
            }
190
        }
191 3
    }
192
193
    /**
194
     * @param $maxCount
195
     * @param $duration
196
     * @param $processTimeout
197
     */
198
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
199
    {
200 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
201
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
202
            $processTimeout *= 2;
203
        }
204
205 1
        if (null === $maxCount && null === $duration) {
206 1
            $maxCount = 1;
207
        }
208 1
    }
209
210
    /**
211
     * Determine if the run loop is finished.
212
     *
213
     * @param $maxCount
214
     * @param integer|null $currentJob
215
     * @param integer|null       $duration
0 ignored issues
show
Bug introduced by
There is no parameter named $duration. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
216
     * @param \DateTime|null $endTime
217
     * @param boolean $noMoreJobsToRun
218
     *
219
     * @return bool
220
     */
221
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
222
    {
223 3
        if ((null === $maxCount || $currentJob <= $maxCount)) {
224 3
            if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
225 3
                if ($noMoreJobsToRun) {
226 3
                    return true;
227
                }
228
229 3
                return false;
230
            }
231 2
            $now = new \DateTime();
232 2
            if ($endTime > $now) {
233 2
                return false;
234
            }
235
        }
236
237 3
        return true;
238
    }
239
240
    /**
241
     * @param float $start
242
     */
243
    protected function recordHeartbeat($start)
244
    {
245 3
        $this->run->setLastHeartbeatAt(new \DateTime());
246 3
        $this->run->setElapsed(microtime(true) - $start);
247 3
        $this->persistRun();
248 3
    }
249
250
    protected function persistRun($action = 'persist')
251
    {
252 3
        if ($this->runManager instanceof BaseRunManager) {
253 2
            $objectManager = $this->runManager->getObjectManager();
254 2
            $objectManager->$action($this->run);
255 2
            $objectManager->flush();
256
        }
257 3
    }
258
259
    /**
260
     * @param int $count
261
     */
262
    protected function updateProcessed($count)
263
    {
264 3
        $this->run->setProcessed($count);
265 3
        $this->persistRun();
266 3
    }
267
268
    /**
269
     * Sets up the runManager (document / entity persister) if appropriate.
270
     *
271
     * @param float    $start
272
     * @param int|null $maxCount
273
     * @param int|null $duration
274
     */
275
    protected function runStart($start, $maxCount = null, $duration = null)
276
    {
277 3
        $runClass = $this->runManager->getRunClass();
278 3
        $this->run = new $runClass();
279 3
        $startDate = \DateTime::createFromFormat('U.u', $start);
280 3
        $this->run->setLastHeartbeatAt($startDate);
281 3
        $this->run->setStartedAt($startDate);
282 3
        if (null !== $maxCount) {
283 3
            $this->run->setMaxCount($maxCount);
284
        }
285 3
        if (null !== $duration) {
286 2
            $this->run->setDuration($duration);
287
        }
288 3
        $this->run->setHostname(gethostname());
289 3
        $this->run->setPid(getmypid());
290 3
        $this->run->setProcessed(0);
291 3
        $this->persistRun();
292 3
    }
293
294
    /**
295
     * @param int|null $start
296
     */
297
    protected function runStop($start)
298
    {
299 3
        $end = microtime(true);
300 3
        $endedTime = \DateTime::createFromFormat('U.u', $end);
301 3
        if ($endedTime) {
302 3
            $this->run->setEndedAt($endedTime);
303
        }
304 3
        $this->run->setElapsed($end - $start);
305 3
        $this->persistRun('remove');
306 3
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
307 3
    }
308
309
    /**
310
     * @param Job $job
311
     */
312
    protected function reportJob(Job $job)
313
    {
314 3
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
315
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
316
            $this->log('error', $message);
317
        }
318
319 3
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
320 3
        $this->log('info', $message);
321 3
    }
322
323
    /**
324
     * @param string $level
325
     */
326
    public function log($level, $msg, array $context = [])
327
    {
328 3
        if ($this->logger) {
329
            $this->logger->$level($msg, $context);
330
331
            return;
332
        }
333
334 3
        if ($this->output) {
335 1
            $date = new \DateTime();
336 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
337 1
            if (!empty($context)) {
338 1
                $this->output->write(print_r($context, true));
339
            }
340 1
            $this->output->writeln('');
341
        }
342 3
    }
343
}
344