RunCommand::checkRunningJobs()   B
last analyzed

Complexity

Conditions 11
Paths 81

Size

Total Lines 75
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 43
nc 81
nop 0
dl 0
loc 75
rs 7.3166
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/** @noinspection PhpComposerExtensionStubsInspection */
4
5
declare(strict_types=1);
6
7
namespace Setono\SyliusSchedulerPlugin\Command;
8
9
use Doctrine\ORM\EntityManager;
10
use Setono\SyliusSchedulerPlugin\Doctrine\ORM\JobRepository;
11
use Setono\SyliusSchedulerPlugin\Event\NewOutputEvent;
12
use Setono\SyliusSchedulerPlugin\Event\StateChangeEvent;
13
use Setono\SyliusSchedulerPlugin\Exception\InvalidArgumentException;
14
use Setono\SyliusSchedulerPlugin\JobManager\JobManager;
15
use Setono\SyliusSchedulerPlugin\Model\JobInterface;
16
use Setono\SyliusSchedulerPlugin\SetonoSyliusSchedulerPluginEvent;
17
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
18
use Symfony\Component\Console\Input\InputInterface;
19
use Symfony\Component\Console\Input\InputOption;
20
use Symfony\Component\Console\Output\OutputInterface;
21
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
22
use Symfony\Component\Process\Exception\ProcessFailedException;
23
use Symfony\Component\Process\Process;
24
25
class RunCommand extends ContainerAwareCommand
0 ignored issues
show
Deprecated Code introduced by
The class Symfony\Bundle\Framework...d\ContainerAwareCommand has been deprecated: since Symfony 4.2, use {@see Command} instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

25
class RunCommand extends /** @scrutinizer ignore-deprecated */ ContainerAwareCommand
Loading history...
26
{
27
    protected static $defaultName = 'setono:scheduler:run';
28
29
    /**
30
     * @var EntityManager
31
     */
32
    private $entityManager;
33
34
    /**
35
     * @var EventDispatcherInterface
36
     */
37
    private $eventDispatcher;
38
39
    /**
40
     * @var JobManager
41
     */
42
    private $jobManager;
43
44
    /**
45
     * @var JobRepository
46
     */
47
    private $jobRepository;
48
49
    /**
50
     * @var string
51
     */
52
    private $env;
53
54
    /**
55
     * @var bool
56
     */
57
    private $verbose;
58
59
    /**
60
     * @var OutputInterface
61
     */
62
    private $output;
63
64
    /**
65
     * @var array
66
     */
67
    private $runningJobs = [];
68
69
    /**
70
     * @var bool
71
     */
72
    private $shouldShutdown = false;
73
74
    /**
75
     * @param JobManager $jobManager
76
     * @param JobRepository $jobRepository
77
     * @param EntityManager $entityManager
78
     * @param EventDispatcherInterface $eventDispatcher
79
     */
80
    public function __construct(
81
        JobManager $jobManager,
82
        JobRepository $jobRepository,
83
        EntityManager $entityManager,
84
        EventDispatcherInterface $eventDispatcher
85
    ) {
86
        $this->jobManager = $jobManager;
87
        $this->jobRepository = $jobRepository;
88
        $this->entityManager = $entityManager;
89
        $this->eventDispatcher = $eventDispatcher;
90
91
        parent::__construct();
92
    }
93
94
    /**
95
     * @noinspection ReturnTypeCanBeDeclaredInspection
96
     */
97
    protected function configure()
98
    {
99
        $this
100
            ->setDescription('Runs jobs from the queue.')
101
            ->addOption('max-runtime', 'r', InputOption::VALUE_REQUIRED, 'The maximum runtime in seconds.', '900')
102
            ->addOption('max-concurrent-jobs', 'j', InputOption::VALUE_REQUIRED, 'The maximum number of concurrent jobs.', '4')
103
            ->addOption('idle-time', null, InputOption::VALUE_REQUIRED, 'Time to sleep when the queue ran out of jobs.', '2')
104
            ->addOption('queue', null, InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Restrict to one or more queues.', [])
105
            ->addOption('worker-name', null, InputOption::VALUE_REQUIRED, 'The name that uniquely identifies this worker process.')
106
        ;
107
    }
108
109
    /**
110
     * {@inheritdoc}
111
     *
112
     * @noinspection ReturnTypeCanBeDeclaredInspection
113
     */
114
    protected function execute(InputInterface $input, OutputInterface $output)
115
    {
116
        $startTime = time();
117
118
        $maxRuntime = (int) $input->getOption('max-runtime');
119
        if ($maxRuntime <= 0) {
120
            throw new InvalidArgumentException('The maximum runtime must be greater than zero.');
121
        }
122
123
        if ($maxRuntime > 600) {
124
            $maxRuntime += random_int(-120, 120);
125
        }
126
127
        $maxJobs = (int) $input->getOption('max-concurrent-jobs');
128
        if ($maxJobs <= 0) {
129
            throw new InvalidArgumentException('The maximum number of jobs per queue must be greater than zero.');
130
        }
131
132
        $idleTime = (int) $input->getOption('idle-time');
133
        if ($idleTime <= 0) {
134
            throw new InvalidArgumentException('Time to sleep when idling must be greater than zero.');
135
        }
136
137
        $restrictedQueues = $input->getOption('queue');
138
139
        /** @var string|null $workerName */
140
        $workerName = $input->getOption('worker-name');
141
        if ($workerName === null) {
142
            $workerName = gethostname() . '-' . getmypid();
143
        } elseif (\strlen($workerName) > 50) {
144
            throw new \RuntimeException(sprintf(
145
                '"worker-name" must not be longer than 50 chars, but got "%s" (%d chars).',
146
                $workerName,
147
                \strlen($workerName)
148
            ));
149
        }
150
151
        $this->env = (string) $input->getOption('env');
152
        $this->verbose = (string) $input->getOption('verbose');
0 ignored issues
show
Documentation Bug introduced by
The property $verbose was declared of type boolean, but (string)$input->getOption('verbose') is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
153
        $this->output = $output;
154
        $this->entityManager->getConnection()->getConfiguration()->setSQLLogger(null);
155
156
        if ($this->verbose) {
157
            $this->output->writeln('Cleaning up stale jobs');
158
        }
159
160
        $this->cleanUpStaleJobs($workerName);
161
162
        $this->runJobs(
163
            $workerName,
164
            $startTime,
165
            $maxRuntime,
166
            $idleTime,
167
            $maxJobs,
168
            $restrictedQueues,
0 ignored issues
show
Bug introduced by
It seems like $restrictedQueues can also be of type boolean and null and string; however, parameter $restrictedQueues of Setono\SyliusSchedulerPl...d\RunCommand::runJobs() does only seem to accept array, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

168
            /** @scrutinizer ignore-type */ $restrictedQueues,
Loading history...
169
            $this->getContainer()->getParameter('setono_sylius_scheduler.queue_options_defaults'), // @todo DI
170
            $this->getContainer()->getParameter('setono_sylius_scheduler.queue_options') // @todo DI
171
        );
172
    }
173
174
    /**
175
     * @param string|null $workerName
176
     * @param int $startTime
177
     * @param int $maxRuntime
178
     * @param int $idleTime
179
     * @param int $maxJobs
180
     * @param array $restrictedQueues
181
     * @param array $queueOptionsDefaults
182
     * @param array $queueOptions
183
     */
184
    private function runJobs(?string $workerName, int $startTime, int $maxRuntime, int $idleTime, int $maxJobs, array $restrictedQueues, array $queueOptionsDefaults, array $queueOptions): void
185
    {
186
        $hasPcntl = \extension_loaded('pcntl');
187
188
        if ($this->verbose) {
189
            $this->output->writeln('Running jobs');
190
        }
191
192
        if ($hasPcntl) {
193
            $this->setupSignalHandlers();
194
            if ($this->verbose) {
195
                $this->output->writeln('Signal Handlers have been installed.');
196
            }
197
        } elseif ($this->verbose) {
198
            $this->output->writeln('PCNTL extension is not available. Signals cannot be processed.');
199
        }
200
201
        while (true) {
202
            if ($hasPcntl) {
203
                pcntl_signal_dispatch();
204
            }
205
206
            if ($this->shouldShutdown || time() - $startTime > $maxRuntime) {
207
                break;
208
            }
209
210
            $this->checkRunningJobs();
211
            $this->startJobs($workerName, $idleTime, $maxJobs, $restrictedQueues, $queueOptionsDefaults, $queueOptions);
212
213
            $waitTimeInMs = random_int(500, 1000);
214
            usleep($waitTimeInMs * 1000);
215
        }
216
217
        if ($this->verbose) {
218
            $this->output->writeln('Entering shutdown sequence, waiting for running jobs to terminate...');
219
        }
220
221
        while (!empty($this->runningJobs)) {
222
            sleep(5);
223
            $this->checkRunningJobs();
224
        }
225
226
        if ($this->verbose) {
227
            $this->output->writeln('All jobs finished, exiting.');
228
        }
229
    }
230
231
    private function setupSignalHandlers(): void
232
    {
233
        pcntl_signal(SIGTERM, function () {
234
            if ($this->verbose) {
235
                $this->output->writeln('Received SIGTERM signal.');
236
            }
237
238
            $this->shouldShutdown = true;
239
        });
240
    }
241
242
    /**
243
     * @param string|null $workerName
244
     * @param int $idleTime
245
     * @param int $maxJobs
246
     * @param array $restrictedQueues
247
     * @param array $queueOptionsDefaults
248
     * @param array $queueOptions
249
     */
250
    private function startJobs(?string $workerName, int $idleTime, int $maxJobs, array $restrictedQueues, array $queueOptionsDefaults, array $queueOptions): void
251
    {
252
        $excludedIds = [];
253
        while (count($this->runningJobs) < $maxJobs) {
254
            $pendingJob = $this->jobRepository->findOneStartableAndAquireLock(
255
                $workerName,
0 ignored issues
show
Bug introduced by
It seems like $workerName can also be of type null; however, parameter $workerName of Setono\SyliusSchedulerPl...tartableAndAquireLock() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

255
                /** @scrutinizer ignore-type */ $workerName,
Loading history...
256
                $excludedIds,
257
                $this->getExcludedQueues($queueOptionsDefaults, $queueOptions, $maxJobs),
258
                $restrictedQueues
259
            );
260
261
            if (null === $pendingJob) {
262
                sleep($idleTime);
263
264
                return;
265
            }
266
267
            $this->startJob($pendingJob);
268
        }
269
    }
270
271
    /**
272
     * @param array $queueOptionsDefaults
273
     * @param array $queueOptions
274
     * @param int $maxConcurrentJobs
275
     *
276
     * @return array
277
     */
278
    private function getExcludedQueues(array $queueOptionsDefaults, array $queueOptions, int $maxConcurrentJobs): array
279
    {
280
        $excludedQueues = [];
281
        foreach ($this->getRunningJobsPerQueue() as $queue => $count) {
282
            if ($count >= $this->getMaxConcurrentJobs($queue, $queueOptionsDefaults, $queueOptions, $maxConcurrentJobs)) {
283
                $excludedQueues[] = $queue;
284
            }
285
        }
286
287
        return $excludedQueues;
288
    }
289
290
    /**
291
     * @param string $queue
292
     * @param array $queueOptionsDefaults
293
     * @param array $queueOptions
294
     * @param int $maxConcurrentJobs
295
     *
296
     * @return int
297
     */
298
    private function getMaxConcurrentJobs(string $queue, array $queueOptionsDefaults, array $queueOptions, int $maxConcurrentJobs): int
299
    {
300
        if (isset($queueOptions[$queue]['max_concurrent_jobs'])) {
301
            return (int) $queueOptions[$queue]['max_concurrent_jobs'];
302
        }
303
304
        if (isset($queueOptionsDefaults['max_concurrent_jobs'])) {
305
            return (int) $queueOptionsDefaults['max_concurrent_jobs'];
306
        }
307
308
        return $maxConcurrentJobs;
309
    }
310
311
    /**
312
     * @return array
313
     */
314
    private function getRunningJobsPerQueue(): array
315
    {
316
        $runningJobsPerQueue = [];
317
        foreach ($this->runningJobs as $jobDetails) {
318
            /** @var JobInterface $job */
319
            $job = $jobDetails['job'];
320
321
            $queue = $job->getQueue();
322
            if (!isset($runningJobsPerQueue[$queue])) {
323
                $runningJobsPerQueue[$queue] = 0;
324
            }
325
            ++$runningJobsPerQueue[$queue];
326
        }
327
328
        return $runningJobsPerQueue;
329
    }
330
331
    private function checkRunningJobs(): void
332
    {
333
        foreach ($this->runningJobs as $i => &$data) {
334
            $newOutput = substr($data['process']->getOutput(), $data['output_pointer']);
335
            $data['output_pointer'] += \strlen($newOutput);
336
337
            $newErrorOutput = substr($data['process']->getErrorOutput(), $data['error_output_pointer']);
338
            $data['error_output_pointer'] += \strlen($newErrorOutput);
339
340
            if (!empty($newOutput)) {
341
                $event = new NewOutputEvent($data['job'], $newOutput, NewOutputEvent::TYPE_STDOUT);
342
                $this->eventDispatcher->dispatch(SetonoSyliusSchedulerPluginEvent::JOB_NEW_OUTPUT, $event);
343
                $newOutput = $event->getNewOutput();
344
            }
345
346
            if (!empty($newErrorOutput)) {
347
                $event = new NewOutputEvent($data['job'], $newErrorOutput, NewOutputEvent::TYPE_STDERR);
348
                $this->eventDispatcher->dispatch(SetonoSyliusSchedulerPluginEvent::JOB_NEW_OUTPUT, $event);
349
                $newErrorOutput = $event->getNewOutput();
350
            }
351
352
            if ($this->verbose) {
353
                if (!empty($newOutput)) {
354
                    $this->output->writeln('Job ' . $data['job']->getId() . ': ' . str_replace("\n", "\nJob " . $data['job']->getId() . ': ', $newOutput));
355
                }
356
357
                if (!empty($newErrorOutput)) {
358
                    $this->output->writeln('Job ' . $data['job']->getId() . ': ' . str_replace("\n", "\nJob " . $data['job']->getId() . ': ', $newErrorOutput));
359
                }
360
            }
361
362
            // Check whether this process exceeds the maximum runtime, and terminate if that is
363
            // the case.
364
            $runtime = time() - $data['job']->getStartedAt()->getTimestamp();
365
            /** @noinspection NotOptimalIfConditionsInspection */
366
            if ($data['job']->getMaxRuntime() > 0 && $runtime > $data['job']->getMaxRuntime()) {
367
                $data['process']->stop(5);
368
369
                $this->output->writeln($data['job'] . ' terminated; maximum runtime exceeded.');
370
                $this->jobManager->closeJob($data['job'], JobInterface::STATE_TERMINATED);
371
                unset($this->runningJobs[$i]);
372
373
                continue;
374
            }
375
376
            if ($data['process']->isRunning()) {
377
                // For long running processes, it is nice to update the output status regularly.
378
                $data['job']->addOutput($newOutput);
379
                $data['job']->addErrorOutput($newErrorOutput);
380
                $data['job']->setCheckedAt(new \DateTime());
381
382
                $this->entityManager->persist($data['job']);
383
                $this->entityManager->flush($data['job']);
384
385
                continue;
386
            }
387
388
            $this->output->writeln($data['job'] . ' finished with exit code ' . $data['process']->getExitCode() . '.');
389
390
            // If the Job exited with an exception, let's reload it so that we
391
            // get access to the stack trace. This might be useful for listeners.
392
            $this->entityManager->refresh($data['job']);
393
394
            $data['job']->setExitCode($data['process']->getExitCode());
395
            $data['job']->setOutput($data['process']->getOutput());
396
            $data['job']->setErrorOutput($data['process']->getErrorOutput());
397
            $data['job']->setRuntime(time() - $data['start_time']);
398
399
            $newState = 0 === $data['process']->getExitCode() ? JobInterface::STATE_FINISHED : JobInterface::STATE_FAILED;
400
            $this->jobManager->closeJob($data['job'], $newState);
401
            unset($this->runningJobs[$i]);
402
        }
403
        unset($data);
404
405
        gc_collect_cycles();
406
    }
407
408
    /**
409
     * @param JobInterface $job
410
     */
411
    private function startJob(JobInterface $job): void
412
    {
413
        $event = new StateChangeEvent($job, JobInterface::STATE_RUNNING);
414
        $this->eventDispatcher->dispatch(SetonoSyliusSchedulerPluginEvent::JOB_STATE_CHANGED, $event);
415
        $newState = $event->getNewState();
416
417
        if (JobInterface::STATE_CANCELED === $newState) {
418
            $this->jobManager->closeJob($job, JobInterface::STATE_CANCELED);
419
420
            return;
421
        }
422
423
        if (JobInterface::STATE_RUNNING !== $newState) {
424
            throw new \LogicException(sprintf('Unsupported new state "%s".', $newState));
425
        }
426
427
        $job->setState(JobInterface::STATE_RUNNING);
428
429
        $this->entityManager->persist($job);
430
        $this->entityManager->flush($job);
431
432
        $args = $this->getBasicCommandLineArgs();
433
        $args[] = $job->getCommand();
434
        // $args[] = '--jms-job-id=' . $job->getId();
435
436
        foreach ($job->getArgs() as $arg) {
437
            $args[] = $arg;
438
        }
439
440
        $process = new Process($args);
441
        $process->start();
442
        $this->output->writeln(sprintf(
443
            'Started %s.',
444
            (string) $job
445
        ));
446
447
        $this->runningJobs[] = [
448
            'process' => $process,
449
            'job' => $job,
450
            'start_time' => time(),
451
            'output_pointer' => 0,
452
            'error_output_pointer' => 0,
453
        ];
454
    }
455
456
    /**
457
     * Cleans up stale jobs.
458
     *
459
     * A stale job is a job where this command has exited with an error
460
     * condition. Although this command is very robust, there might be cases
461
     * where it might be terminated abruptly (like a PHP segfault, a SIGTERM signal, etc.).
462
     *
463
     * In such an error condition, these jobs are cleaned-up on restart of this command.
464
     *
465
     * @param string $workerName
466
     */
467
    private function cleanUpStaleJobs(string $workerName): void
468
    {
469
        /** @var JobInterface[] $staleJobs */
470
        $staleJobs = $this->jobRepository->findStale($workerName);
471
        foreach ($staleJobs as $job) {
472
            // If the original job has retry jobs, then one of them is still in
473
            // running state. We can skip the original job here as it will be
474
            // processed automatically once the retry job is processed.
475
            if (!$job->isRetryJob() && count($job->getRetryJobs()) > 0) {
476
                continue;
477
            }
478
479
            $args = $this->getBasicCommandLineArgs();
480
            $args[] = 'setono:scheduler:mark-incomplete';
481
            $args[] = $job->getId();
482
483
            // We use a separate process to clean up.
484
            $process = new Process($args);
485
            if (0 !== $process->run()) {
486
                $ex = new ProcessFailedException($process);
487
488
                $this->output->writeln(sprintf(
489
                    'There was an error when marking %s as incomplete: %s',
490
                    $job,
491
                    $ex->getMessage()
492
                ));
493
            }
494
        }
495
    }
496
497
    /**
498
     * @return array
499
     */
500
    private function getBasicCommandLineArgs(): array
501
    {
502
        $args = [
503
            PHP_BINARY,
504
            $_SERVER['SYMFONY_CONSOLE_FILE'] ?? $_SERVER['argv'][0],
505
            '--env=' . $this->env,
506
        ];
507
508
        if ($this->verbose) {
509
            $args[] = '--verbose';
510
        }
511
512
        return $args;
513
    }
514
}
515