Completed
Push — master ( b7b31c...0ef8ec )
by Matthew
04:34
created

Loop   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 329
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 81.45%

Importance

Changes 0
Metric Value
wmc 47
lcom 1
cbo 9
dl 0
loc 329
ccs 101
cts 124
cp 0.8145
rs 8.439
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 1
A getLastRun() 0 4 1
A getProcessTimeout() 0 4 1
A setProcessTimeout() 0 4 1
A setLogger() 0 4 1
A setOutput() 0 4 1
B runJobById() 0 26 3
B runLoop() 0 26 3
A checkParameters() 0 9 1
B validateMaxCountDuration() 0 9 5
A validateNanoSleep() 0 6 2
A getEndTime() 0 11 2
B runCurrentJob() 0 18 5
B checkMaxCountDuration() 0 11 6
A isFinished() 0 11 3
A isFinishedJobs() 0 12 3
A isFinishedEndTime() 0 9 2
A reportJob() 0 12 2
A log() 0 17 4

How to fix   Complexity   

Complex Class

Complex classes like Loop often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Loop, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\Run;
4
5
use Dtc\QueueBundle\Doctrine\BaseJobManager;
6
use Dtc\QueueBundle\Model\BaseJob;
7
use Dtc\QueueBundle\Model\Job;
8
use Dtc\QueueBundle\Model\JobManagerInterface;
9
use Dtc\QueueBundle\Model\Run;
10
use Dtc\QueueBundle\Model\RunManager;
11
use Dtc\QueueBundle\Model\WorkerManager;
12
use Dtc\QueueBundle\Util\Util;
13
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
14
use Psr\Log\LoggerInterface;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
class Loop
18
{
19
    /** @var OutputInterface */
20
    protected $output;
21
22
    /** @var LoggerInterface */
23
    protected $logger;
24
25
    /** @var WorkerManager */
26
    protected $workerManager;
27
28
    /** @var JobManagerInterface */
29
    protected $jobManager;
30
31
    /** @var RunManager */
32
    protected $runManager;
33
34
    /** @var int */
35
    protected $processTimeout;
36
37
    /** @var Run */
38
    protected $lastRun;
39
40 3
    public function __construct(
41
        WorkerManager $workerManager,
42
        JobManagerInterface $jobManager,
43
        RunManager $runManager)
44
    {
45 3
        $this->workerManager = $workerManager;
46 3
        $this->jobManager = $jobManager;
47 3
        $this->runManager = $runManager;
48 3
    }
49
50
    /**
51
     * @return Run|null
52
     */
53 3
    public function getLastRun()
54
    {
55 3
        return $this->lastRun;
56
    }
57
58
    /**
59
     * @return int
60
     */
61
    public function getProcessTimeout()
62
    {
63
        return $this->processTimeout;
64
    }
65
66
    /**
67
     * @param int $processTimeout
68
     */
69
    public function setProcessTimeout(int $processTimeout)
70
    {
71
        $this->processTimeout = $processTimeout;
72
    }
73
74
    public function setLogger(LoggerInterface $logger)
75
    {
76
        $this->logger = $logger;
77
    }
78
79 1
    public function setOutput(OutputInterface $output)
80
    {
81 1
        $this->output = $output;
82 1
    }
83
84
    /**
85
     * @param float $start
86
     */
87 2
    public function runJobById($start, $jobId)
88
    {
89 2
        $run = $this->runManager->runStart($start, null, null, $this->processTimeout);
90 2
        $this->lastRun = $run;
91
92 2
        if (!$this->jobManager instanceof BaseJobManager) {
93 1
            throw new ClassNotSubclassException("Can't get job by id when not using a database/datastore backed queue (such as mongodb or an RDBMS)");
94
        }
95
96
        /** @var Job $job */
97 1
        $job = $this->jobManager->getRepository()->find($jobId);
98 1
        if (!$job) {
99
            $this->log('error', "Job id is not found: {$jobId}");
100
            $this->runManager->runStop($run, $start);
101
102
            return;
103
        }
104
105 1
        $job = $this->workerManager->runJob($job);
106 1
        $this->reportJob($job);
107 1
        $run->setProcessed(1);
108 1
        $this->runManager->runStop($run, $start);
109 1
        $this->log('info', 'Ended with 1 job processed over '.strval($run->getElapsed()).' seconds.');
110
111 1
        return;
112
    }
113
114
    /**
115
     * @param float    $start
116
     * @param int      $nanoSleep
117
     * @param null|int $maxCount
118
     * @param null|int $duration
119
     */
120 3
    public function runLoop($start, $workerName, $methodName, $maxCount, $duration = null, $nanoSleep = 500000000)
121
    {
122 3
        $this->checkParameters($nanoSleep, $maxCount, $duration);
123 3
        $this->workerManager->setLoggingFunc([$this, 'log']);
124 3
        $run = $this->runManager->runStart($start, $maxCount, $duration, $this->processTimeout);
125 3
        $this->lastRun = $run;
126
        try {
127 3
            $this->log('info', 'Staring up a new job...');
128
129 3
            $endTime = $this->getEndTime($run, $duration);
130 3
            $currentJob = 1;
131 3
            $noMoreJobsToRun = false;
132
            do {
133 3
                $job = $this->workerManager->run($workerName, $methodName, true, $run->getId());
134 3
                $this->runManager->recordHeartbeat($run, $start, $job);
135 3
                $this->runCurrentJob($run, $job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
136 3
            } while (!$this->isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun));
137
        } catch (\Exception $e) {
138
            // Uncaught error: possibly with QueueBundle itself
139
            $this->log('critical', $e->getMessage(), $e->getTrace());
140
        }
141 3
        $this->runManager->runStop($run, $start);
142 3
        $this->log('info', 'Ended with '.$run->getProcessed().' job(s) processed over '.strval($run->getElapsed()).' seconds.');
143
144 3
        return 0;
145
    }
146
147
    /**
148
     * @param int      $nanoSleep
149
     * @param null|int $maxCount
150
     * @param null|int $duration
151
     *
152
     * @throws \InvalidArgumentException
153
     */
154 3
    private function checkParameters(&$nanoSleep, &$maxCount, &$duration)
155
    {
156 3
        $maxCount = Util::validateIntNull('maxCount', $maxCount, 32);
157 3
        $duration = Util::validateIntNull('duration', $duration, 32);
158 3
        $nanoSleep = Util::validateIntNull('nanoSleep', $nanoSleep, 63);
159
160 3
        $this->validateNanoSleep($nanoSleep);
161 3
        $this->validateMaxCountDuration($maxCount, $duration);
162 3
    }
163
164
    /**
165
     * @param int|null $maxCount
166
     * @param int|null $duration
167
     *
168
     * @throws \InvalidArgumentException
169
     */
170 3
    protected function validateMaxCountDuration($maxCount, $duration)
171
    {
172 3
        if (0 === $maxCount && 0 === $duration) {
173
            throw new \InvalidArgumentException('maxCount and duration can not both be 0');
174
        }
175 3
        if (null === $maxCount && null === $duration) {
176
            throw new \InvalidArgumentException('maxCount and duration can not both be null');
177
        }
178 3
    }
179
180
    /**
181
     * @param int|null $nanoSleep
182
     *
183
     * @throws \InvalidArgumentException
184
     */
185 3
    protected function validateNanoSleep($nanoSleep)
186
    {
187 3
        if (null === $nanoSleep) {
188
            throw new \InvalidArgumentException("nanoSleep can't be null");
189
        }
190 3
    }
191
192
    /**
193
     * @param int|null $duration
194
     *
195
     * @return null|\DateTime
196
     */
197 3
    protected function getEndTime(Run $run, $duration)
198
    {
199 3
        $endTime = null;
200 3
        if (null !== $duration) {
201 2
            $interval = new \DateInterval("PT${duration}S");
202 2
            $endTime = clone $run->getStartedAt();
203 2
            $endTime->add($interval);
204
        }
205
206
        return $endTime;
207
    }
208
209
    /**
210
     * @param Run      $run
211
     * @param Job|null $job
212
     * @param bool     $noMoreJobsToRun
213
     * @param int      $currentJob
214
     * @param int|null $duration
215
     * @param int      $nanoSleep
216
     */
217
    protected function runCurrentJob($run, $job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
218
    {
219 3
        if (null !== $job) {
220 3
            $noMoreJobsToRun = false;
221 3
            $this->reportJob($job);
222 3
            $this->runManager->updateProcessed($run, $currentJob);
223 3
            ++$currentJob;
224
        } else {
225 3
            if (!$noMoreJobsToRun) {
226 3
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
227 3
                $noMoreJobsToRun = true;
228
            }
229 3
            if (null !== $duration) {
230 2
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
231 2
                time_nanosleep(0, $nanoSleepTime);
232
            }
233
        }
234 3
    }
235
236
    /**
237
     * @param $maxCount
238
     * @param $duration
239
     * @param $processTimeout
240
     */
241
    public function checkMaxCountDuration(&$maxCount, &$duration, &$processTimeout)
242
    {
243 1
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
244
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
245
            $processTimeout *= 2;
246
        }
247
248 1
        if (null === $maxCount && null === $duration) {
249 1
            $maxCount = 1;
250
        }
251 1
    }
252
253
    /**
254
     * Determine if the run loop is finished.
255
     *
256
     * @param int|null       $maxCount
257
     * @param int            $currentJob
258
     * @param \DateTime|null $endTime
259
     * @param bool           $noMoreJobsToRun
260
     *
261
     * @return bool
262
     */
263
    protected function isFinished($maxCount, $endTime, $currentJob, $noMoreJobsToRun)
264
    {
265 3
        if (null === $maxCount) {
266 2
            return $this->isFinishedEndTime($endTime);
0 ignored issues
show
Bug introduced by
It seems like $endTime defined by parameter $endTime on line 263 can be null; however, Dtc\QueueBundle\Run\Loop::isFinishedEndTime() does not accept null, maybe add an additional type check?

It seems like you allow that null is being passed for a parameter, however the function which is called does not seem to accept null.

We recommend to add an additional type check (or disallow null for the parameter):

function notNullable(stdClass $x) { }

// Unsafe
function withoutCheck(stdClass $x = null) {
    notNullable($x);
}

// Safe - Alternative 1: Adding Additional Type-Check
function withCheck(stdClass $x = null) {
    if ($x instanceof stdClass) {
        notNullable($x);
    }
}

// Safe - Alternative 2: Changing Parameter
function withNonNullableParam(stdClass $x) {
    notNullable($x);
}
Loading history...
267
        }
268 3
        if ($currentJob <= $maxCount) {
269 3
            return $this->isFinishedJobs($endTime, $noMoreJobsToRun);
270
        }
271
272 3
        return true;
273
    }
274
275
    /**
276
     * @param \DateTime|null $endTime
277
     * @param bool           $noMoreJobsToRun
278
     *
279
     * @return bool
280
     */
281
    protected function isFinishedJobs($endTime, $noMoreJobsToRun)
282
    {
283 3
        if (null === $endTime) { // This means that there is a $maxCount as we force one or the other to be not null
284 3
            if ($noMoreJobsToRun) {
285 3
                return true;
286
            }
287
288 3
            return false;
289
        }
290
291
        return $this->isFinishedEndTime($endTime);
292
    }
293
294
    /**
295
     * @param \DateTime $endTime
296
     *
297
     * @return bool
298
     */
299
    protected function isFinishedEndTime(\DateTime $endTime)
300
    {
301 2
        $now = new \DateTime();
302 2
        if ($endTime > $now) {
303 2
            return false;
304
        }
305
306 2
        return true;
307
    }
308
309
    /**
310
     * @param Job $job
311
     */
312
    protected function reportJob(Job $job)
313
    {
314 3
        $this->runManager->recordJobRun($job);
315
316 3
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
317
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
318
            $this->log('error', $message);
319
        }
320
321 3
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
322 3
        $this->log('info', $message);
323 3
    }
324
325
    /**
326
     * @param string $level
327
     */
328
    public function log($level, $msg, array $context = [])
329
    {
330 3
        if ($this->logger) {
331
            $this->logger->$level($msg, $context);
332
333
            return;
334
        }
335
336 3
        if ($this->output) {
337 1
            $date = new \DateTime();
338 1
            $this->output->write("[$level] [".$date->format('c').'] '.$msg);
339 1
            if (!empty($context)) {
340 1
                $this->output->write(print_r($context, true));
341
            }
342 1
            $this->output->writeln('');
343
        }
344 3
    }
345
}
346