Completed
Push — master ( ad518d...0b0f4b )
by Matthew
05:51
created

Loop::persistRun()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 1
crap 2
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Doctrine\BaseRunManager;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\JobManagerInterface;
10
use Dtc\QueueBundle\Model\Run;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\WorkerManager;
13
use Dtc\QueueBundle\Util\Util;
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
class Loop
18
{
19
    /** @var Run $run */
20
    protected $run;
21
22
    /** @var OutputInterface */
23
    protected $output;
24
25
    /** @var LoggerInterface */
26
    protected $logger;
27
28
    /** @var WorkerManager */
29
    protected $workerManager;
30
31
    /** @var JobManagerInterface */
32
    protected $jobManager;
33
34
    /** @var RunManager */
35
    protected $runManager;
36
37 3
    public function __construct(
38
        WorkerManager $workerManager,
39
        JobManagerInterface $jobManager,
40
        RunManager $runManager)
41
    {
42 3
        $this->workerManager = $workerManager;
43 3
        $this->jobManager = $jobManager;
44 3
        $this->runManager = $runManager;
45 3
    }
46
47
    public function setLogger(LoggerInterface $logger)
48
    {
49
        $this->logger = $logger;
50
    }
51
52 1
    public function setOutput(OutputInterface $output)
53
    {
54 1
        $this->output = $output;
55 1
    }
56
57
    /**
58
     * The current (last) run object.
59
     *
60
     * @return Run|null
61
     */
62 3
    public function getRun()
63
    {
64 3
        return $this->run;
65
    }
66
67
    /**
68
     * @param float $start
69
     */
70 2
    public function runJobById($start, $jobId)
71
    {
72 2
        $this->runStart($start);
73
74 2
        if (!$this->jobManager instanceof BaseJobManager) {
75 1
            throw new \Exception("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
76
        }
77
78
        /** @var Job $job */
79 1
        $job = $this->jobManager->getRepository()->find($jobId);
80 1
        if (!$job) {
81
            $this->log('error', "Job id is not found: {$jobId}");
82
            $this->runStop($start);
83
84
            return;
85
        }
86
87 1
        $job = $this->workerManager->runJob($job);
88 1
        $this->reportJob($job);
89 1
        $this->run->setProcessed(1);
90 1
        $this->runStop($start);
91
92 1
        return;
93
    }
94
95
    /**
96
     * @param float    $start
97
     * @param int      $nanoSleep
98
     * @param null|int $maxCount
99
     * @param null|int $duration
100
     */
101 3
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
102
    {
103 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
104 3
        $this->workerManager->setLoggingFunc([$this, 'log']);
105 3
        $this->runStart($start, $maxCount, $duration);
106
        try {
107 3
            $this->log('info', 'Staring up a new job...');
108
109 3
            $endTime = $this->getEndTime($duration);
110 3
            $currentJob = 1;
111 3
            $noMoreJobsToRun = false;
112
            do {
113 3
                $this->recordHeartbeat($start);
114 3
                $job = $this->workerManager->run($workerName, $methodName, true, $this->run->getId());
115 3
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
116 3
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
117
        } catch (\Exception $e) {
118
            // Uncaught error: possibly with QueueBundle itself
119
            $this->log('critical', $e->getMessage(), $e->getTrace());
120
        }
121 3
        $this->runStop($start);
122
123 3
        return 0;
124
    }
125
126
    /**
127
     * @param int      $nanoSleep
128
     * @param null|int $maxCount
129
     * @param null|int $duration
130
     *
131
     * @throws \Exception
132
     */
133 3
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
134
    {
135 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
136 3
        $duration = Util::validateIntNull('duration', $duration, 32);
137 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
138
139 3
        if (null === $nanoSleep) {
140
            throw new \Exception("nanoSleep can't be null");
141
        }
142 3
        if (0 === $maxCount && 0 === $duration) {
143
            throw new \Exception('maxCount and duration can not both be 0');
144
        }
145 3
        if (null === $maxCount && null === $duration) {
146
            throw new \Exception('maxCount and duration can not both be null');
147
        }
148 3
    }
149
150
    /**
151
     * @param int|null $duration
152
     *
153
     * @return null|\DateTime
154
     */
155 3
    protected function getEndTime($duration)
156
    {
157 3
        $endTime = null;
158 3
        if (null !== $duration) {
159 2
            $interval = new \DateInterval("PT${duration}S");
160 2
            $endTime = clone $this->run->getStartedAt();
161 2
            $endTime->add($interval);
162
        }
163
164
        return $endTime;
165
    }
166
167
    /**
168
     * @param Job|null $job
169
     * @param bool     $noMoreJobsToRun
170
     * @param int      $currentJob
171
     * @param int|null $duration
172
     * @param int      $nanoSleep
173
     */
174
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
175
    {
176 3
        if (null !== $job) {
177 3
            $noMoreJobsToRun = false;
178 3
            $this->reportJob($job);
179 3
            $this->updateProcessed($currentJob);
180 3
            ++$currentJob;
181
        } else {
182 3
            if (!$noMoreJobsToRun) {
183 3
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
184 3
                $noMoreJobsToRun = true;
185
            }
186 3
            if (null !== $duration) {
187 2
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
188 2
                time_nanosleep(0, $nanoSleepTime);
189
            }
190
        }
191 3
    }
192
193
    /**
194
     * @param $maxCount
195
     * @param $duration
196
     * @param $processTimeout
197
     */
198
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
199
    {
200 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
201
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
202
            $processTimeout *= 2;
203
        }
204
205 1
        if (null === $maxCount && null === $duration) {
206 1
            $maxCount = 1;
207
        }
208 1
    }
209
210
    /**
211
     * Determine if the run loop is finished.
212
     *
213
     * @param int|null       $maxCount
214
     * @param int            $currentJob
215
     * @param \DateTime|null $endTime
216
     * @param bool           $noMoreJobsToRun
217
     *
218
     * @return bool
219
     */
220
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
221
    {
222 3
        if ((null === $maxCount || $currentJob <= $maxCount)) {
223 3
            if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
224 3
                if ($noMoreJobsToRun) {
225 3
                    return true;
226
                }
227
228 3
                return false;
229
            }
230 2
            $now = new \DateTime();
231 2
            if ($endTime > $now) {
232 2
                return false;
233
            }
234
        }
235
236 3
        return true;
237
    }
238
239
    /**
240
     * @param float $start
241
     */
242
    protected function recordHeartbeat($start)
243
    {
244 3
        $this->run->setLastHeartbeatAt(new \DateTime());
245 3
        $this->run->setElapsed(microtime(true) - $start);
246 3
        $this->persistRun();
247 3
    }
248
249
    protected function persistRun($action = 'persist')
250
    {
251 3
        if ($this->runManager instanceof BaseRunManager) {
252 2
            $objectManager = $this->runManager->getObjectManager();
253 2
            $objectManager->$action($this->run);
254 2
            $objectManager->flush();
255
        }
256 3
    }
257
258
    /**
259
     * @param int $count
260
     */
261
    protected function updateProcessed($count)
262
    {
263 3
        $this->run->setProcessed($count);
264 3
        $this->persistRun();
265 3
    }
266
267
    /**
268
     * Sets up the runManager (document / entity persister) if appropriate.
269
     *
270
     * @param float    $start
271
     * @param int|null $maxCount
272
     * @param int|null $duration
273
     */
274
    protected function runStart($start, $maxCount = null, $duration = null)
275
    {
276 3
        $runClass = $this->runManager->getRunClass();
277 3
        $this->run = new $runClass();
278 3
        $startDate = \DateTime::createFromFormat('U.u', $start);
279 3
        $this->run->setLastHeartbeatAt($startDate);
280 3
        $this->run->setStartedAt($startDate);
281 3
        if (null !== $maxCount) {
282 3
            $this->run->setMaxCount($maxCount);
283
        }
284 3
        if (null !== $duration) {
285 2
            $this->run->setDuration($duration);
286
        }
287 3
        $this->run->setHostname(gethostname());
288 3
        $this->run->setPid(getmypid());
289 3
        $this->run->setProcessed(0);
290 3
        $this->persistRun();
291 3
    }
292
293
    /**
294
     * @param int|null $start
295
     */
296
    protected function runStop($start)
297
    {
298 3
        $end = microtime(true);
299 3
        $endedTime = \DateTime::createFromFormat('U.u', $end);
300 3
        if ($endedTime) {
301 3
            $this->run->setEndedAt($endedTime);
302
        }
303 3
        $this->run->setElapsed($end - $start);
304 3
        $this->persistRun('remove');
305 3
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
306 3
    }
307
308
    /**
309
     * @param Job $job
310
     */
311
    protected function reportJob(Job $job)
312
    {
313 3
        $this->runManager->recordJobRun($job);
314
315 3
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
316
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
317
            $this->log('error', $message);
318
        }
319
320 3
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
321 3
        $this->log('info', $message);
322 3
    }
323
324
    /**
325
     * @param string $level
326
     */
327
    public function log($level, $msg, array $context = [])
328
    {
329 3
        if ($this->logger) {
330
            $this->logger->$level($msg, $context);
331
332
            return;
333
        }
334
335 3
        if ($this->output) {
336 1
            $date = new \DateTime();
337 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
338 1
            if (!empty($context)) {
339 1
                $this->output->write(print_r($context, true));
340
            }
341 1
            $this->output->writeln('');
342
        }
343 3
    }
344
}
345