Completed
Push — master ( 01653d...13c111 )
by Matthew
06:19
created

Loop::isFinished()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 18
ccs 9
cts 9
cp 1
rs 8.8571
c 0
b 0
f 0
cc 6
eloc 10
nc 5
nop 4
crap 6
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Doctrine\BaseRunManager;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\JobManagerInterface;
10
use Dtc\QueueBundle\Model\Run;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\WorkerManager;
13
use Dtc\QueueBundle\Util\Util;
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
class Loop
18
{
19
    /** @var Run $run */
20
    protected $run;
21
22
    /** @var OutputInterface */
23
    protected $output;
24
25
    /** @var LoggerInterface */
26
    protected $logger;
27
28
    /** @var WorkerManager */
29
    protected $workerManager;
30
31
    /** @var JobManagerInterface */
32
    protected $jobManager;
33
34
    /** @var RunManager */
35
    protected $runManager;
36
37
    /** @var int */
38
    protected $processTimeout;
39
40 3
    public function __construct(
41
        WorkerManager $workerManager,
42
        JobManagerInterface $jobManager,
43
        RunManager $runManager)
44
    {
45 3
        $this->workerManager = $workerManager;
46 3
        $this->jobManager = $jobManager;
47 3
        $this->runManager = $runManager;
48 3
    }
49
50
    /**
51
     * @return int
52
     */
53
    public function getProcessTimeout()
54
    {
55
        return $this->processTimeout;
56
    }
57
58
    /**
59
     * @param int $processTimeout
60
     */
61
    public function setProcessTimeout(int $processTimeout)
62
    {
63
        $this->processTimeout = $processTimeout;
64
    }
65
66
    public function setLogger(LoggerInterface $logger)
67
    {
68
        $this->logger = $logger;
69
    }
70
71 1
    public function setOutput(OutputInterface $output)
72
    {
73 1
        $this->output = $output;
74 1
    }
75
76
    /**
77
     * The current (last) run object.
78
     *
79
     * @return Run|null
80
     */
81 3
    public function getRun()
82
    {
83 3
        return $this->run;
84
    }
85
86
    /**
87
     * @param float $start
88
     */
89 2
    public function runJobById($start, $jobId)
90
    {
91 2
        $this->runStart($start);
92
93 2
        if (!$this->jobManager instanceof BaseJobManager) {
94 1
            throw new \Exception("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
95
        }
96
97
        /** @var Job $job */
98 1
        $job = $this->jobManager->getRepository()->find($jobId);
99 1
        if (!$job) {
100
            $this->log('error', "Job id is not found: {$jobId}");
101
            $this->runStop($start);
102
103
            return;
104
        }
105
106 1
        $job = $this->workerManager->runJob($job);
107 1
        $this->reportJob($job);
108 1
        $this->run->setProcessed(1);
109 1
        $this->runStop($start);
110
111 1
        return;
112
    }
113
114
    /**
115
     * @param float    $start
116
     * @param int      $nanoSleep
117
     * @param null|int $maxCount
118
     * @param null|int $duration
119
     */
120 3
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
121
    {
122 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
123 3
        $this->workerManager->setLoggingFunc([$this, 'log']);
124 3
        $this->runStart($start, $maxCount, $duration);
125
        try {
126 3
            $this->log('info', 'Staring up a new job...');
127
128 3
            $endTime = $this->getEndTime($duration);
129 3
            $currentJob = 1;
130 3
            $noMoreJobsToRun = false;
131
            do {
132 3
                $this->recordHeartbeat($start);
133 3
                $job = $this->workerManager->run($workerName, $methodName, true, $this->run->getId());
134 3
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
135 3
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
136
        } catch (\Exception $e) {
137
            // Uncaught error: possibly with QueueBundle itself
138
            $this->log('critical', $e->getMessage(), $e->getTrace());
139
        }
140 3
        $this->runStop($start);
141
142 3
        return 0;
143
    }
144
145
    /**
146
     * @param int      $nanoSleep
147
     * @param null|int $maxCount
148
     * @param null|int $duration
149
     *
150
     * @throws \Exception
151
     */
152 3
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
153
    {
154 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
155 3
        $duration = Util::validateIntNull('duration', $duration, 32);
156 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
157
158 3
        if (null === $nanoSleep) {
159
            throw new \Exception("nanoSleep can't be null");
160
        }
161 3
        if (0 === $maxCount && 0 === $duration) {
162
            throw new \Exception('maxCount and duration can not both be 0');
163
        }
164 3
        if (null === $maxCount && null === $duration) {
165
            throw new \Exception('maxCount and duration can not both be null');
166
        }
167 3
    }
168
169
    /**
170
     * @param int|null $duration
171
     *
172
     * @return null|\DateTime
173
     */
174 3
    protected function getEndTime($duration)
175
    {
176 3
        $endTime = null;
177 3
        if (null !== $duration) {
178 2
            $interval = new \DateInterval("PT${duration}S");
179 2
            $endTime = clone $this->run->getStartedAt();
180 2
            $endTime->add($interval);
181
        }
182
183
        return $endTime;
184
    }
185
186
    /**
187
     * @param Job|null $job
188
     * @param bool     $noMoreJobsToRun
189
     * @param int      $currentJob
190
     * @param int|null $duration
191
     * @param int      $nanoSleep
192
     */
193
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
194
    {
195 3
        if (null !== $job) {
196 3
            $noMoreJobsToRun = false;
197 3
            $this->reportJob($job);
198 3
            $this->updateProcessed($currentJob);
199 3
            ++$currentJob;
200
        } else {
201 3
            if (!$noMoreJobsToRun) {
202 3
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
203 3
                $noMoreJobsToRun = true;
204
            }
205 3
            if (null !== $duration) {
206 2
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
207 2
                time_nanosleep(0, $nanoSleepTime);
208
            }
209
        }
210 3
    }
211
212
    /**
213
     * @param $maxCount
214
     * @param $duration
215
     * @param $processTimeout
216
     */
217
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
218
    {
219 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
220
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
221
            $processTimeout *= 2;
222
        }
223
224 1
        if (null === $maxCount && null === $duration) {
225 1
            $maxCount = 1;
226
        }
227 1
    }
228
229
    /**
230
     * Determine if the run loop is finished.
231
     *
232
     * @param int|null       $maxCount
233
     * @param int            $currentJob
234
     * @param \DateTime|null $endTime
235
     * @param bool           $noMoreJobsToRun
236
     *
237
     * @return bool
238
     */
239
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
240
    {
241 3
        if ((null === $maxCount || $currentJob <= $maxCount)) {
242 3
            if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
243 3
                if ($noMoreJobsToRun) {
244 3
                    return true;
245
                }
246
247 3
                return false;
248
            }
249 2
            $now = new \DateTime();
250 2
            if ($endTime > $now) {
251 2
                return false;
252
            }
253
        }
254
255 3
        return true;
256
    }
257
258
    /**
259
     * @param float $start
260
     */
261
    protected function recordHeartbeat($start)
262
    {
263 3
        $this->run->setLastHeartbeatAt(new \DateTime());
264 3
        $this->run->setElapsed(microtime(true) - $start);
265 3
        $this->persistRun();
266 3
    }
267
268
    protected function persistRun($action = 'persist')
269
    {
270 3
        if ($this->runManager instanceof BaseRunManager) {
271 2
            $objectManager = $this->runManager->getObjectManager();
272 2
            $objectManager->$action($this->run);
273 2
            $objectManager->flush();
274
        }
275 3
    }
276
277
    /**
278
     * @param int $count
279
     */
280
    protected function updateProcessed($count)
281
    {
282 3
        $this->run->setProcessed($count);
283 3
        $this->persistRun();
284 3
    }
285
286
    /**
287
     * Sets up the runManager (document / entity persister) if appropriate.
288
     *
289
     * @param float    $start
290
     * @param int|null $maxCount
291
     * @param int|null $duration
292
     */
293
    protected function runStart($start, $maxCount = null, $duration = null)
294
    {
295 3
        $runClass = $this->runManager->getRunClass();
296 3
        $this->run = new $runClass();
297 3
        $startDate = \DateTime::createFromFormat('U.u', $start);
298 3
        $this->run->setLastHeartbeatAt($startDate);
299 3
        $this->run->setStartedAt($startDate);
300 3
        if (null !== $maxCount) {
301 3
            $this->run->setMaxCount($maxCount);
302
        }
303 3
        if (null !== $duration) {
304 2
            $this->run->setDuration($duration);
305
        }
306 3
        $this->run->setHostname(gethostname());
307 3
        $this->run->setPid(getmypid());
308 3
        $this->run->setProcessed(0);
309 3
        $this->run->setProcessTimeout($this->processTimeout);
310 3
        $this->persistRun();
311 3
    }
312
313
    /**
314
     * @param int|null $start
315
     */
316
    protected function runStop($start)
317
    {
318 3
        $end = microtime(true);
319 3
        $endedTime = \DateTime::createFromFormat('U.u', $end);
320 3
        if ($endedTime) {
321 3
            $this->run->setEndedAt($endedTime);
322
        }
323 3
        $this->run->setElapsed($end - $start);
324 3
        $this->persistRun('remove');
325 3
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
326 3
    }
327
328
    /**
329
     * @param Job $job
330
     */
331
    protected function reportJob(Job $job)
332
    {
333 3
        $this->runManager->recordJobRun($job);
334
335 3
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
336
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
337
            $this->log('error', $message);
338
        }
339
340 3
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
341 3
        $this->log('info', $message);
342 3
    }
343
344
    /**
345
     * @param string $level
346
     */
347
    public function log($level, $msg, array $context = [])
348
    {
349 3
        if ($this->logger) {
350
            $this->logger->$level($msg, $context);
351
352
            return;
353
        }
354
355 3
        if ($this->output) {
356 1
            $date = new \DateTime();
357 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
358 1
            if (!empty($context)) {
359 1
                $this->output->write(print_r($context, true));
360
            }
361 1
            $this->output->writeln('');
362
        }
363 3
    }
364
}
365