Completed
Push — master ( 745336...aadd5d )
by Matthew
06:17
created

Loop::runJobById()   B

Complexity

Conditions 3
Paths 3

Size

Total Lines 24
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3.0884

Importance

Changes 0
Metric Value
dl 0
loc 24
ccs 11
cts 14
cp 0.7856
rs 8.9713
c 0
b 0
f 0
cc 3
eloc 14
nc 3
nop 2
crap 3.0884
1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Doctrine\BaseRunManager;
7
use Dtc\QueueBundle\Model\BaseJob;
8
use Dtc\QueueBundle\Model\Job;
9
use Dtc\QueueBundle\Model\JobManagerInterface;
10
use Dtc\QueueBundle\Model\Run;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\WorkerManager;
13
use Dtc\QueueBundle\Util\Util;
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
class Loop
18
{
19
    /** @var Run $run */
20
    protected $run;
21
22
    /** @var OutputInterface */
23
    protected $output;
24
25
    /** @var LoggerInterface */
26
    protected $logger;
27
28
    /** @var WorkerManager */
29
    protected $workerManager;
30
31
    /** @var JobManagerInterface */
32
    protected $jobManager;
33
34
    /** @var RunManager */
35
    protected $runManager;
36
37 3
    public function __construct(
38
        WorkerManager $workerManager,
39
        JobManagerInterface $jobManager,
40
        RunManager $runManager)
41
    {
42 3
        $this->workerManager = $workerManager;
43 3
        $this->jobManager = $jobManager;
44 3
        $this->runManager = $runManager;
45 3
    }
46
47
    public function setLogger(LoggerInterface $logger)
48
    {
49
        $this->logger = $logger;
50
    }
51
52 1
    public function setOutput(OutputInterface $output)
53
    {
54 1
        $this->output = $output;
55 1
    }
56
57
    /**
58
     * The current (last) run object.
59
     *
60
     * @return Run|null
61
     */
62 3
    public function getRun()
63
    {
64 3
        return $this->run;
65
    }
66
67
    /**
68
     * @param float $start
69
     */
70 2
    public function runJobById($start, $jobId)
71
    {
72 2
        $this->runStart($start);
73
74 2
        if (!$this->jobManager instanceof BaseJobManager) {
75 1
            throw new \Exception("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
76
        }
77
78
        /** @var Job $job */
79 1
        $job = $this->jobManager->getRepository()->find($jobId);
80 1
        if (!$job) {
81
            $this->log('error', "Job id is not found: {$jobId}");
82
            $this->runStop($start);
83
84
            return;
85
        }
86
87 1
        $job = $this->workerManager->runJob($job);
88 1
        $this->reportJob($job);
89 1
        $this->run->setProcessed(1);
90 1
        $this->runStop($start);
91
92 1
        return;
93
    }
94
95
    /**
96
     * @param float    $start
97
     * @param null|int $nanoSleep
98
     * @param null|int $maxCount
99
     * @param null|int $duration
100
     */
101 3
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
102
    {
103 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
104 3
        $this->workerManager->setLoggingFunc([$this, 'log']);
105 3
        $this->runStart($start, $maxCount, $duration);
106
        try {
107 3
            $this->log('info', 'Staring up a new job...');
108
109 3
            $endTime = $this->getEndTime($duration);
110 3
            $currentJob = 1;
111 3
            $noMoreJobsToRun = false;
112
            do {
113 3
                $this->recordHeartbeat($start);
114 3
                $job = $this->workerManager->run($workerName, $methodName, true, $this->run->getId());
115 3
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
0 ignored issues
show
Bug introduced by
It seems like $job defined by $this->workerManager->ru...e, $this->run->getId()) on line 114 can be null; however, Dtc\QueueBundle\Run\Loop::runCurrentJob() does not accept null, maybe add an additional type check?

Unless you are absolutely sure that the expression can never be null because of other conditions, we strongly recommend to add an additional type check to your code:

/** @return stdClass|null */
function mayReturnNull() { }

function doesNotAcceptNull(stdClass $x) { }

// With potential error.
function withoutCheck() {
    $x = mayReturnNull();
    doesNotAcceptNull($x); // Potential error here.
}

// Safe - Alternative 1
function withCheck1() {
    $x = mayReturnNull();
    if ( ! $x instanceof stdClass) {
        throw new \LogicException('$x must be defined.');
    }
    doesNotAcceptNull($x);
}

// Safe - Alternative 2
function withCheck2() {
    $x = mayReturnNull();
    if ($x instanceof stdClass) {
        doesNotAcceptNull($x);
    }
}
Loading history...
116 3
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
0 ignored issues
show
Bug introduced by
It seems like $endTime defined by $this->getEndTime($duration) on line 109 can be null; however, Dtc\QueueBundle\Run\Loop::isFinished() does not accept null, maybe add an additional type check?

Unless you are absolutely sure that the expression can never be null because of other conditions, we strongly recommend to add an additional type check to your code:

/** @return stdClass|null */
function mayReturnNull() { }

function doesNotAcceptNull(stdClass $x) { }

// With potential error.
function withoutCheck() {
    $x = mayReturnNull();
    doesNotAcceptNull($x); // Potential error here.
}

// Safe - Alternative 1
function withCheck1() {
    $x = mayReturnNull();
    if ( ! $x instanceof stdClass) {
        throw new \LogicException('$x must be defined.');
    }
    doesNotAcceptNull($x);
}

// Safe - Alternative 2
function withCheck2() {
    $x = mayReturnNull();
    if ($x instanceof stdClass) {
        doesNotAcceptNull($x);
    }
}
Loading history...
117
        } catch (\Exception $e) {
118
            // Uncaught error: possibly with QueueBundle itself
119
            $this->log('critical', $e->getMessage(), $e->getTrace());
120
        }
121 3
        $this->runStop($start);
122
123 3
        return 0;
124
    }
125
126
    /**
127
     * @param $nanoSleep
128
     * @param $maxCount
129
     * @param $duration
130
     *
131
     * @throws \Exception
132
     */
133 3
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
134
    {
135 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
136 3
        $duration = Util::validateIntNull('duration', $duration, 32);
137 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
138
139 3
        if (null === $nanoSleep) {
140
            throw new \Exception("nanoSleep can't be null");
141
        }
142 3
        if (0 === $maxCount && 0 === $duration) {
143
            throw new \Exception('maxCount and duration can not both be 0');
144
        }
145 3
        if (null === $maxCount && null === $duration) {
146
            throw new \Exception('maxCount and duration can not both be null');
147
        }
148 3
    }
149
150
    /**
151
     * @param int|null $duration
152
     *
153
     * @return null|\DateTime
154
     */
155 3
    protected function getEndTime($duration)
156
    {
157 3
        $endTime = null;
158 3
        if (null !== $duration) {
159 2
            $interval = new \DateInterval("PT${duration}S");
160 2
            $endTime = clone $this->run->getStartedAt();
161 2
            $endTime->add($interval);
162
        }
163
164
        return $endTime;
165
    }
166
167
    /**
168
     * @param Job      $job
169
     * @param bool     $noMoreJobsToRun
170
     * @param int      $currentJob
171
     * @param int|null $duration
172
     * @param int      $nanoSleep
173
     */
174
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
175
    {
176 3
        if ($job) {
177 3
            $noMoreJobsToRun = false;
178 3
            $this->reportJob($job);
179 3
            $this->updateProcessed($currentJob);
180 3
            ++$currentJob;
181
        } else {
182 3
            if (!$noMoreJobsToRun) {
183 3
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
184 3
                $noMoreJobsToRun = true;
185
            }
186 3
            if (null !== $duration) {
187 2
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
188 2
                time_nanosleep(0, $nanoSleepTime);
189
            }
190
        }
191 3
    }
192
193
    /**
194
     * @param $maxCount
195
     * @param $duration
196
     * @param $processTimeout
197
     */
198
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
199
    {
200 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
201
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
202
            $processTimeout *= 2;
203
        }
204
205 1
        if (null === $maxCount && null === $duration) {
206 1
            $maxCount = 1;
207
        }
208 1
    }
209
210
    /**
211
     * Determine if the run loop is finished.
212
     *
213
     * @param $maxCount
214
     * @param $currentJob
215
     * @param int|null  $duration
0 ignored issues
show
Bug introduced by
There is no parameter named $duration. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
216
     * @param \DateTime $endTime
217
     *
218
     * @return bool
219
     */
220
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
221
    {
222 3
        if ((null === $maxCount || $currentJob <= $maxCount)) {
223 3
            if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
224 3
                if ($noMoreJobsToRun) {
225 3
                    return true;
226
                }
227
228 3
                return false;
229
            }
230 2
            $now = new \DateTime();
231 2
            if ($endTime > $now) {
232 2
                return false;
233
            }
234
        }
235
236 3
        return true;
237
    }
238
239
    /**
240
     * @param float $start
241
     */
242
    protected function recordHeartbeat($start)
243
    {
244 3
        $this->run->setLastHeartbeatAt(new \DateTime());
245 3
        $this->run->setElapsed(microtime(true) - $start);
246 3 View Code Duplication
        if ($this->runManager instanceof BaseRunManager) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
247 2
            $objectManager = $this->runManager->getObjectManager();
248 2
            $objectManager->persist($this->run);
249 2
            $objectManager->flush();
250
        }
251 3
    }
252
253
    /**
254
     * @param int $count
255
     */
256
    protected function updateProcessed($count)
257
    {
258 3
        $this->run->setProcessed($count);
259 3 View Code Duplication
        if ($this->runManager instanceof BaseRunManager) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
260 2
            $objectManager = $this->runManager->getObjectManager();
261 2
            $objectManager->persist($this->run);
262 2
            $objectManager->flush();
263
        }
264 3
    }
265
266
    /**
267
     * Sets up the runManager (document / entity persister) if appropriate.
268
     *
269
     * @param float    $start
270
     * @param int|null $maxCount
271
     * @param int|null $duration
272
     */
273
    protected function runStart($start, $maxCount = null, $duration = null)
274
    {
275 3
        $runClass = $this->runManager->getRunClass();
276 3
        $this->run = new $runClass();
277 3
        $startDate = \DateTime::createFromFormat('U.u', $start);
278 3
        $this->run->setLastHeartbeatAt($startDate);
279 3
        $this->run->setStartedAt($startDate);
280 3
        if (null !== $maxCount) {
281 3
            $this->run->setMaxCount($maxCount);
282
        }
283 3
        if (null !== $duration) {
284 2
            $this->run->setDuration($duration);
285
        }
286 3
        $this->run->setHostname(gethostname());
287 3
        $this->run->setPid(getmypid());
288 3
        $this->run->setProcessed(0);
289 3 View Code Duplication
        if ($this->runManager instanceof BaseRunManager) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
290 2
            $objectManager = $this->runManager->getObjectManager();
291 2
            $objectManager->persist($this->run);
292 2
            $objectManager->flush();
293
        }
294 3
    }
295
296
    /**
297
     * @param int|null $start
298
     */
299
    protected function runStop($start)
300
    {
301 3
        $end = microtime(true);
302 3
        $endedTime = \DateTime::createFromFormat('U.u', $end);
303 3
        if ($endedTime) {
304 3
            $this->run->setEndedAt($endedTime);
305
        }
306 3
        $this->run->setElapsed($end - $start);
307 3 View Code Duplication
        if ($this->runManager instanceof BaseRunManager) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
308 2
            $objectManager = $this->runManager->getObjectManager();
309 2
            $objectManager->remove($this->run);
310 2
            $objectManager->flush();
311
        }
312 3
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
313 3
    }
314
315
    /**
316
     * @param Job $job
317
     */
318
    protected function reportJob(Job $job)
319
    {
320 3
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
321
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
322
            $this->log('error', $message);
323
        }
324
325 3
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
326 3
        $this->log('info', $message);
327 3
    }
328
329
    /**
330
     * @param string $level
331
     */
332
    public function log($level, $msg, array $context = [])
333
    {
334 3
        if ($this->logger) {
335
            $this->logger->$level($msg, $context);
336
337
            return;
338
        }
339
340 3
        if ($this->output) {
341 1
            $date = new \DateTime();
342 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
343 1
            if (!empty($context)) {
344 1
                $this->output->write(print_r($context, true));
345
            }
346 1
            $this->output->writeln('');
347
        }
348 3
    }
349
}
350