Completed
Push — master ( 824463...2e93e9 )
by Matthew
05:21
created

RunCommand::runLoop()   B

Complexity

Conditions 3
Paths 8

Size

Total Lines 25
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 25
ccs 0
cts 22
cp 0
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 19
nc 8
nop 6
crap 12
1
<?php
2
3
namespace Dtc\QueueBundle\Command;
4
5
use Doctrine\Common\Persistence\ObjectManager;
6
use Dtc\QueueBundle\Model\BaseJob;
7
use Dtc\QueueBundle\Model\Job;
8
use Dtc\QueueBundle\Model\Run;
9
use Psr\Log\LoggerInterface;
10
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
11
use Symfony\Component\Console\Input\InputArgument;
12
use Symfony\Component\Console\Input\InputInterface;
13
use Symfony\Component\Console\Input\InputOption;
14
use Symfony\Component\Console\Output\OutputInterface;
15
16
class RunCommand extends ContainerAwareCommand
17
{
18
    /** @var ObjectManager */
19
    protected $runManager;
20
21
    /** @var Run $run */
22
    protected $run;
23
24
    /** @var string */
25
    protected $runClass;
26
27
    /** @var OutputInterface */
28
    protected $output;
29
30
    /** @var LoggerInterface */
31
    protected $logger;
32
33
    protected function configure()
34
    {
35
        $this
36
            ->setName('dtc:queue:run')
37
            ->setDefinition(
38
                array(
39
                    new InputArgument('worker_name', InputArgument::OPTIONAL, 'Name of worker', null),
40
                    new InputArgument('method', InputArgument::OPTIONAL, 'DI method of worker', null),
41
                    new InputOption(
42
                        'id',
43
                        'i',
44
                        InputOption::VALUE_REQUIRED,
45
                        'Id of Job to run',
46
                        null
47
                    ),
48
                    new InputOption(
49
                        'max_count',
50
                        'm',
51
                        InputOption::VALUE_REQUIRED,
52
                        'Maximum number of jobs to work on before exiting',
53
                        null
54
                    ),
55
                    new InputOption(
56
                        'duration',
57
                        'd',
58
                        InputOption::VALUE_REQUIRED,
59
                        'Duration to run for in seconds',
60
                        null
61
                    ),
62
                    new InputOption(
63
                        'timeout',
64
                        't',
65
                        InputOption::VALUE_REQUIRED,
66
                        'Process timeout in seconds (hard exit of process regardless)',
67
                        3600
68
                    ),
69
                    new InputOption(
70
                        'nano_sleep',
71
                        's',
72
                        InputOption::VALUE_REQUIRED,
73
                        'If using duration, this is the time to sleep when there\'s no jobs in nanoseconds',
74
                        500000000
75
                    ),
76
                    new InputOption(
77
                        'logger',
78
                        'l',
79
                        InputOption::VALUE_REQUIRED,
80
                        'Log using the logger service specified, or output to console if null (or an invalid logger service id) is passed in'
81
                    ),
82
                )
83
            )
84
            ->setDescription('Start up a job in queue');
85
    }
86
87
    /**
88
     * @param float $start
89
     */
90
    protected function runJobById($start, $jobId)
91
    {
92
        $this->runStart($start);
93
        $container = $this->getContainer();
94
        $jobManager = $container->get('dtc_queue.job_manager');
95
        $workerManager = $container->get('dtc_queue.worker_manager');
96
97
        $job = $jobManager->getRepository()->find($jobId);
98
        if (!$job) {
99
            $this->log('error', "Job id is not found: {$jobId}");
100
            $this->runStop($start);
101
102
            return;
103
        }
104
105
        $job = $workerManager->runJob($job);
106
        $this->reportJob($job);
107
        $this->run->setProcessed(1);
108
        $this->runStop($start);
109
110
        return;
111
    }
112
113
    /**
114
     * @param string $varName
115
     * @param int    $pow
116
     */
117
    private function validateIntNull($varName, $var, $pow)
118
    {
119
        if (null === $var) {
120
            return null;
121
        }
122
        if (!ctype_digit(strval($var))) {
123
            throw new \Exception("$varName must be an integer");
124
        }
125
126
        if (strval(intval($var)) !== strval($var) || $var <= 0 || $var >= pow(2, $pow)) {
127
            throw new \Exception("$varName must be an base 10 integer within 2^32");
128
        }
129
130
        return intval($var);
131
    }
132
133
    /**
134
     * @param string $level
135
     */
136
    public function log($level, $msg, array $context = [])
137
    {
138
        if ($this->logger) {
139
            $this->logger->$level($msg, $context);
140
141
            return;
142
        }
143
144
        $date = new \DateTime();
145
        $this->output->write("[$level] [".$date->format('c').'] '.$msg);
146
        if (!empty($context)) {
147
            $this->output->write(print_r($context, true));
148
        }
149
        $this->output->writeln('');
150
    }
151
152
    protected function execute(InputInterface $input, OutputInterface $output)
153
    {
154
        $start = microtime(true);
155
        $this->output = $output;
156
        $container = $this->getContainer();
157
        $workerName = $input->getArgument('worker_name');
158
        $methodName = $input->getArgument('method');
159
        $maxCount = $input->getOption('max_count');
160
        $duration = $input->getOption('duration');
161
        $processTimeout = $input->getOption('timeout');
162
        $nanoSleep = $input->getOption('nano_sleep');
163
        $loggerService = $input->getOption('logger');
164
165
        if ($container->has($loggerService)) {
166
            $this->logger = $container->get($loggerService);
167
        }
168
169
        $maxCount = $this->validateIntNull('max_count', $maxCount, 32);
170
        $duration = $this->validateIntNull('duration', $duration, 32);
171
        $processTimeout = $this->validateIntNull('timeout', $processTimeout, 32);
172
        $nanoSleep = $this->validateIntNull('nano_sleep', $nanoSleep, 63);
173
174
        if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) {
175
            $this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout));
176
            $processTimeout *= 2;
177
        }
178
179
        if (null === $maxCount && null === $duration) {
180
            $maxCount = 1;
181
        }
182
183
        if (0 === $maxCount) {
184
            $this->log('error', 'max_count set to 0');
185
186
            return 1;
187
        }
188
189
        if (0 === $duration) {
190
            $this->log('error', 'duration set to 0');
191
192
            return 1;
193
        }
194
195
        // Check to see if there are other instances
196
        set_time_limit($processTimeout); // Set timeout on the process
197
198
        if ($jobId = $input->getOption('id')) {
199
            return $this->runJobById($start, $jobId); // Run a single job
200
        }
201
202
        return $this->runLoop($start, $workerName, $methodName, $nanoSleep, $maxCount, $duration);
203
    }
204
205
    /**
206
     * @param float    $start
207
     * @param null|int $nanoSleep
208
     * @param null|int $maxCount
209
     * @param null|int $duration
210
     */
211
    protected function runLoop($start, $workerName, $methodName, $nanoSleep, $maxCount, $duration)
212
    {
213
        $container = $this->getContainer();
214
        $workerManager = $container->get('dtc_queue.worker_manager');
215
        $workerManager->setLoggingFunc([$this, 'log']);
216
        $this->runStart($start, $maxCount, $duration);
217
        try {
218
            $this->log('info', 'Staring up a new job...');
219
220
            $endTime = $this->getEndTime($duration);
221
            $currentJob = 1;
222
            $noMoreJobsToRun = false;
223
            do {
224
                $this->recordHeartbeat($start);
225
                $job = $workerManager->run($workerName, $methodName, true, $this->run->getId());
226
                $this->runCurrentJob($job, $noMoreJobsToRun, $currentJob, $duration, $nanoSleep);
227
            } while (!$this->isFinished($maxCount, $duration, $currentJob, $endTime, $noMoreJobsToRun));
228
        } catch (\Exception $e) {
229
            // Uncaught error: possibly with QueueBundle itself
230
            $this->log('critical', $e->getMessage(), $e->getTrace());
231
        }
232
        $this->runStop($start);
233
234
        return 0;
235
    }
236
237
    /**
238
     * @param int|null $duration
239
     *
240
     * @return null|\DateTime
241
     */
242
    protected function getEndTime($duration)
243
    {
244
        $endTime = null;
245
        if (null !== $duration) {
246
            $interval = new \DateInterval("PT${duration}S");
247
            $endTime = $this->run->getStartedAt()->add($interval);
248
        }
249
250
        return $endTime;
251
    }
252
253
    /**
254
     * @param Job      $job
255
     * @param bool     $noMoreJobsToRun
256
     * @param int      $currentJob
257
     * @param int|null $duration
258
     * @param int      $nanoSleep
259
     */
260
    protected function runCurrentJob($job, &$noMoreJobsToRun, &$currentJob, $duration, $nanoSleep)
261
    {
262
        if ($job) {
263
            $noMoreJobsToRun = false;
264
            $this->reportJob($job);
265
            $this->updateProcessed($currentJob);
266
            ++$currentJob;
267
        } else {
268
            if (!$noMoreJobsToRun) {
269
                $this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).');
270
                $noMoreJobsToRun = true;
271
            }
272
            if (null !== $duration) {
273
                $nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep);
274
                time_nanosleep(0, $nanoSleepTime);
275
            }
276
        }
277
    }
278
279
    /**
280
     * Determine if the run loop is finished.
281
     *
282
     * @param $maxCount
283
     * @param $currentJob
284
     * @param $duration
285
     * @param $endTime
286
     *
287
     * @return bool
288
     */
289
    protected function isFinished($maxCount, $duration, $currentJob, $endTime, $noMoreJobsToRun)
290
    {
291
        if ((null === $maxCount || $currentJob <= $maxCount)) {
292
            if (null === $duration) { // This means that there is a $maxCount as we force one or the other to be not null
293
                if ($noMoreJobsToRun) {
294
                    return true;
295
                }
296
297
                return false;
298
            }
299
            if ((new \DateTime()) < $endTime) {
300
                return false;
301
            }
302
        }
303
304
        return true;
305
    }
306
307
    /**
308
     * @param float $start
309
     */
310
    protected function recordHeartbeat($start)
311
    {
312
        $this->run->setLastHeartbeatAt(new \DateTime());
313
        $this->run->setElapsed(microtime(true) - $start);
314
        if ($this->runManager) {
315
            $this->runManager->persist($this->run);
316
            $this->runManager->flush();
317
        }
318
    }
319
320
    /**
321
     * @param int $count
322
     */
323
    protected function updateProcessed($count)
324
    {
325
        $this->run->setProcessed($count);
326
        if ($this->runManager) {
327
            $this->runManager->persist($this->run);
328
            $this->runManager->flush();
329
        }
330
    }
331
332
    /**
333
     * Sets up the runManager (document / entity persister) if appropriate.
334
     *
335
     * @param $maxCount
336
     * @param $duration
337
     */
338
    protected function runStart($start, $maxCount = null, $duration = null)
339
    {
340
        $container = $this->getContainer();
341
        $this->runClass = $container->getParameter('dtc_queue.class_run');
342
        $defaultManager = $container->getParameter('dtc_queue.default_manager');
343
        if ('mongodb' == $defaultManager && $container->has('dtc_queue.document_manager')) {
344
            $this->runManager = $container->get('dtc_queue.document_manager');
345
        } elseif ('orm' == $defaultManager && $container->has('dtc_queue.entity_manager')) {
346
            $this->runManager = $container->get('dtc_queue.entity_manager');
347
        }
348
349
        $this->createRun($start, $duration, $maxCount);
350
    }
351
352
    /**
353
     * @param $start
354
     * @param $duration
355
     * @param $maxCount
356
     */
357
    protected function createRun($start, $duration, $maxCount)
358
    {
359
        $this->run = new $this->runClass();
360
        $startDate = \DateTime::createFromFormat('U.u', $start);
361
        $this->run->setLastHeartbeatAt($startDate);
362
        $this->run->setStartedAt($startDate);
363
        if (null !== $maxCount) {
364
            $this->run->setMaxCount($maxCount);
365
        }
366
        if (null !== $duration) {
367
            $this->run->setDuration($duration);
368
        }
369
        $this->run->setHostname(gethostname());
370
        $this->run->setPid(getmypid());
371
        $this->run->setProcessed(0);
372
        if ($this->runManager) {
373
            $this->runManager->persist($this->run);
374
            $this->runManager->flush();
375
        }
376
    }
377
378
    protected function runStop($start)
379
    {
380
        $end = microtime(true);
381
        $endTime = \DateTime::createFromFormat('U.u', $end);
382
        if ($endTime) {
383
            $this->run->setEndedAt($endTime);
384
        }
385
        $this->run->setElapsed($end - $start);
386
        if ($this->runManager) {
387
            $this->runManager->remove($this->run);
388
            $this->runManager->flush();
389
        }
390
        $this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.');
391
    }
392
393
    protected function reportJob(Job $job)
394
    {
395
        if (BaseJob::STATUS_ERROR == $job->getStatus()) {
396
            $message = "Error with job id: {$job->getId()}\n".$job->getMessage();
397
            $this->log('error', $message);
398
        }
399
400
        $message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n";
401
        $this->log('info', $message);
402
    }
403
}
404