RunCommand::runJobs()   C
last analyzed

Complexity

Conditions 12
Paths 192

Size

Total Lines 44
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 25
nc 192
nop 8
dl 0
loc 44
rs 6.2
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php
2
3
/** @noinspection PhpComposerExtensionStubsInspection */
4
5
declare(strict_types=1);
6
7
namespace Setono\SyliusSchedulerPlugin\Command;
8
9
use Doctrine\ORM\EntityManager;
10
use Setono\SyliusSchedulerPlugin\Doctrine\ORM\JobRepository;
11
use Setono\SyliusSchedulerPlugin\Event\NewOutputEvent;
12
use Setono\SyliusSchedulerPlugin\Event\StateChangeEvent;
13
use Setono\SyliusSchedulerPlugin\Exception\InvalidArgumentException;
14
use Setono\SyliusSchedulerPlugin\JobManager\JobManager;
15
use Setono\SyliusSchedulerPlugin\Model\JobInterface;
16
use Setono\SyliusSchedulerPlugin\SetonoSyliusSchedulerPluginEvent;
17
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
18
use Symfony\Component\Console\Input\InputInterface;
19
use Symfony\Component\Console\Input\InputOption;
20
use Symfony\Component\Console\Output\OutputInterface;
21
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
22
use Symfony\Component\Process\Exception\ProcessFailedException;
23
use Symfony\Component\Process\Process;
24
25
class RunCommand extends ContainerAwareCommand
0 ignored issues
show
Deprecated Code introduced by
The class Symfony\Bundle\Framework...d\ContainerAwareCommand has been deprecated: since Symfony 4.2, use {@see Command} instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

25
class RunCommand extends /** @scrutinizer ignore-deprecated */ ContainerAwareCommand
Loading history...
26
{
27
    protected static $defaultName = 'setono:scheduler:run';
28
29
    /**
30
     * @var EntityManager
31
     */
32
    private $entityManager;
33
34
    /**
35
     * @var EventDispatcherInterface
36
     */
37
    private $eventDispatcher;
38
39
    /**
40
     * @var JobManager
41
     */
42
    private $jobManager;
43
44
    /**
45
     * @var JobRepository
46
     */
47
    private $jobRepository;
48
49
    /**
50
     * @var string
51
     */
52
    private $env;
53
54
    /**
55
     * @var bool
56
     */
57
    private $verbose;
58
59
    /**
60
     * @var OutputInterface
61
     */
62
    private $output;
63
64
    /**
65
     * @var array
66
     */
67
    private $runningJobs = [];
68
69
    /**
70
     * @var bool
71
     */
72
    private $shouldShutdown = false;
73
74
    /**
75
     * @param JobManager $jobManager
76
     * @param JobRepository $jobRepository
77
     * @param EntityManager $entityManager
78
     * @param EventDispatcherInterface $eventDispatcher
79
     */
80
    public function __construct(
81
        JobManager $jobManager,
82
        JobRepository $jobRepository,
83
        EntityManager $entityManager,
84
        EventDispatcherInterface $eventDispatcher
85
    ) {
86
        $this->jobManager = $jobManager;
87
        $this->jobRepository = $jobRepository;
88
        $this->entityManager = $entityManager;
89
        $this->eventDispatcher = $eventDispatcher;
90
91
        parent::__construct();
92
    }
93
94
    /**
95
     * @noinspection ReturnTypeCanBeDeclaredInspection
96
     */
97
    protected function configure()
98
    {
99
        $this
100
            ->setDescription('Runs jobs from the queue.')
101
            ->addOption('max-runtime', 'r', InputOption::VALUE_REQUIRED, 'The maximum runtime in seconds.', '900')
102
            ->addOption('max-concurrent-jobs', 'j', InputOption::VALUE_REQUIRED, 'The maximum number of concurrent jobs.', '4')
103
            ->addOption('idle-time', null, InputOption::VALUE_REQUIRED, 'Time to sleep when the queue ran out of jobs.', '2')
104
            ->addOption('queue', null, InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Restrict to one or more queues.', [])
105
            ->addOption('worker-name', null, InputOption::VALUE_REQUIRED, 'The name that uniquely identifies this worker process.')
106
        ;
107
    }
108
109
    /**
110
     * {@inheritdoc}
111
     *
112
     * @noinspection ReturnTypeCanBeDeclaredInspection
113
     */
114
    protected function execute(InputInterface $input, OutputInterface $output)
115
    {
116
        $startTime = time();
117
118
        $maxRuntime = (int) $input->getOption('max-runtime');
119
        if ($maxRuntime <= 0) {
120
            throw new InvalidArgumentException('The maximum runtime must be greater than zero.');
121
        }
122
123
        if ($maxRuntime > 600) {
124
            $maxRuntime += random_int(-120, 120);
125
        }
126
127
        $maxJobs = (int) $input->getOption('max-concurrent-jobs');
128
        if ($maxJobs <= 0) {
129
            throw new InvalidArgumentException('The maximum number of jobs per queue must be greater than zero.');
130
        }
131
132
        $idleTime = (int) $input->getOption('idle-time');
133
        if ($idleTime <= 0) {
134
            throw new InvalidArgumentException('Time to sleep when idling must be greater than zero.');
135
        }
136
137
        $restrictedQueues = $input->getOption('queue');
138
139
        /** @var string|null $workerName */
140
        $workerName = $input->getOption('worker-name');
141
        if ($workerName === null) {
142
            $workerName = gethostname() . '-' . getmypid();
143
        } elseif (\strlen($workerName) > 50) {
144
            throw new \RuntimeException(sprintf(
145
                '"worker-name" must not be longer than 50 chars, but got "%s" (%d chars).',
146
                $workerName,
147
                \strlen($workerName)
148
            ));
149
        }
150
151
        $this->env = (string) $input->getOption('env');
152
        $this->verbose = (string) $input->getOption('verbose');
0 ignored issues
show
Documentation Bug introduced by
The property $verbose was declared of type boolean, but (string)$input->getOption('verbose') is of type string. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
153
        $this->output = $output;
154
        $this->entityManager->getConnection()->getConfiguration()->setSQLLogger(null);
155
156
        if ($this->verbose) {
157
            $this->output->writeln('Cleaning up stale jobs');
158
        }
159
160
        $this->cleanUpStaleJobs($workerName);
161
162
        $this->runJobs(
163
            $workerName,
164
            $startTime,
165
            $maxRuntime,
166
            $idleTime,
167
            $maxJobs,
168
            $restrictedQueues,
0 ignored issues
show
Bug introduced by
It seems like $restrictedQueues can also be of type boolean and null and string; however, parameter $restrictedQueues of Setono\SyliusSchedulerPl...d\RunCommand::runJobs() does only seem to accept array, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

168
            /** @scrutinizer ignore-type */ $restrictedQueues,
Loading history...
169
            $this->getContainer()->getParameter('setono_sylius_scheduler.queue_options_defaults'), // @todo DI
170
            $this->getContainer()->getParameter('setono_sylius_scheduler.queue_options') // @todo DI
171
        );
172
    }
173
174
    /**
175
     * @param string|null $workerName
176
     * @param int $startTime
177
     * @param int $maxRuntime
178
     * @param int $idleTime
179
     * @param int $maxJobs
180
     * @param array $restrictedQueues
181
     * @param array $queueOptionsDefaults
182
     * @param array $queueOptions
183
     */
184
    private function runJobs(?string $workerName, int $startTime, int $maxRuntime, int $idleTime, int $maxJobs, array $restrictedQueues, array $queueOptionsDefaults, array $queueOptions): void
185
    {
186
        $hasPcntl = \extension_loaded('pcntl');
187
188
        if ($this->verbose) {
189
            $this->output->writeln('Running jobs');
190
        }
191
192
        if ($hasPcntl) {
193
            $this->setupSignalHandlers();
194
            if ($this->verbose) {
195
                $this->output->writeln('Signal Handlers have been installed.');
196
            }
197
        } elseif ($this->verbose) {
198
            $this->output->writeln('PCNTL extension is not available. Signals cannot be processed.');
199
        }
200
201
        while (true) {
202
            if ($hasPcntl) {
203
                pcntl_signal_dispatch();
204
            }
205
206
            if ($this->shouldShutdown || time() - $startTime > $maxRuntime) {
207
                break;
208
            }
209
210
            $this->checkRunningJobs();
211
            $this->startJobs($workerName, $idleTime, $maxJobs, $restrictedQueues, $queueOptionsDefaults, $queueOptions);
212
213
            $waitTimeInMs = random_int(500, 1000);
214
            usleep($waitTimeInMs * 1000);
215
        }
216
217
        if ($this->verbose) {
218
            $this->output->writeln('Entering shutdown sequence, waiting for running jobs to terminate...');
219
        }
220
221
        while (!empty($this->runningJobs)) {
222
            sleep(5);
223
            $this->checkRunningJobs();
224
        }
225
226
        if ($this->verbose) {
227
            $this->output->writeln('All jobs finished, exiting.');
228
        }
229
    }
230
231
    private function setupSignalHandlers(): void
232
    {
233
        pcntl_signal(SIGTERM, function () {
234
            if ($this->verbose) {
235
                $this->output->writeln('Received SIGTERM signal.');
236
            }
237
238
            $this->shouldShutdown = true;
239
        });
240
    }
241
242
    /**
243
     * @param string|null $workerName
244
     * @param int $idleTime
245
     * @param int $maxJobs
246
     * @param array $restrictedQueues
247
     * @param array $queueOptionsDefaults
248
     * @param array $queueOptions
249
     */
250
    private function startJobs(?string $workerName, int $idleTime, int $maxJobs, array $restrictedQueues, array $queueOptionsDefaults, array $queueOptions): void
251
    {
252
        $excludedIds = [];
253
        while (count($this->runningJobs) < $maxJobs) {
254
            $pendingJob = $this->jobRepository->findOneStartableAndAquireLock(
255
                $workerName,
0 ignored issues
show
Bug introduced by
It seems like $workerName can also be of type null; however, parameter $workerName of Setono\SyliusSchedulerPl...tartableAndAquireLock() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

255
                /** @scrutinizer ignore-type */ $workerName,
Loading history...
256
                $excludedIds,
257
                $this->getExcludedQueues($queueOptionsDefaults, $queueOptions, $maxJobs),
258
                $restrictedQueues
259
            );
260
261
            if (null === $pendingJob) {
262
                sleep($idleTime);
263
264
                return;
265
            }
266
267
            $this->startJob($pendingJob);
268
        }
269
    }
270
271
    /**
272
     * @param array $queueOptionsDefaults
273
     * @param array $queueOptions
274
     * @param int $maxConcurrentJobs
275
     *
276
     * @return array
277
     */
278
    private function getExcludedQueues(array $queueOptionsDefaults, array $queueOptions, int $maxConcurrentJobs): array
279
    {
280
        $excludedQueues = [];
281
        foreach ($this->getRunningJobsPerQueue() as $queue => $count) {
282
            if ($count >= $this->getMaxConcurrentJobs($queue, $queueOptionsDefaults, $queueOptions, $maxConcurrentJobs)) {
283
                $excludedQueues[] = $queue;
284
            }
285
        }
286
287
        return $excludedQueues;
288
    }
289
290
    /**
291
     * @param string $queue
292
     * @param array $queueOptionsDefaults
293
     * @param array $queueOptions
294
     * @param int $maxConcurrentJobs
295
     *
296
     * @return int
297
     */
298
    private function getMaxConcurrentJobs(string $queue, array $queueOptionsDefaults, array $queueOptions, int $maxConcurrentJobs): int
299
    {
300
        if (isset($queueOptions[$queue]['max_concurrent_jobs'])) {
301
            return (int) $queueOptions[$queue]['max_concurrent_jobs'];
302
        }
303
304
        if (isset($queueOptionsDefaults['max_concurrent_jobs'])) {
305
            return (int) $queueOptionsDefaults['max_concurrent_jobs'];
306
        }
307
308
        return $maxConcurrentJobs;
309
    }
310
311
    /**
312
     * @return array
313
     */
314
    private function getRunningJobsPerQueue(): array
315
    {
316
        $runningJobsPerQueue = [];
317
        foreach ($this->runningJobs as $jobDetails) {
318
            /** @var JobInterface $job */
319
            $job = $jobDetails['job'];
320
321
            $queue = $job->getQueue();
322
            if (!isset($runningJobsPerQueue[$queue])) {
323
                $runningJobsPerQueue[$queue] = 0;
324
            }
325
            ++$runningJobsPerQueue[$queue];
326
        }
327
328
        return $runningJobsPerQueue;
329
    }
330
331
    private function checkRunningJobs(): void
332
    {
333
        foreach ($this->runningJobs as $i => &$data) {
334
            $newOutput = substr($data['process']->getOutput(), $data['output_pointer']);
335
            $data['output_pointer'] += \strlen($newOutput);
336
337
            $newErrorOutput = substr($data['process']->getErrorOutput(), $data['error_output_pointer']);
338
            $data['error_output_pointer'] += \strlen($newErrorOutput);
339
340
            if (!empty($newOutput)) {
341
                $event = new NewOutputEvent($data['job'], $newOutput, NewOutputEvent::TYPE_STDOUT);
342
                $this->eventDispatcher->dispatch(SetonoSyliusSchedulerPluginEvent::JOB_NEW_OUTPUT, $event);
343
                $newOutput = $event->getNewOutput();
344
            }
345
346
            if (!empty($newErrorOutput)) {
347
                $event = new NewOutputEvent($data['job'], $newErrorOutput, NewOutputEvent::TYPE_STDERR);
348
                $this->eventDispatcher->dispatch(SetonoSyliusSchedulerPluginEvent::JOB_NEW_OUTPUT, $event);
349
                $newErrorOutput = $event->getNewOutput();
350
            }
351
352
            if ($this->verbose) {
353
                if (!empty($newOutput)) {
354
                    $this->output->writeln('Job ' . $data['job']->getId() . ': ' . str_replace("\n", "\nJob " . $data['job']->getId() . ': ', $newOutput));
355
                }
356
357
                if (!empty($newErrorOutput)) {
358
                    $this->output->writeln('Job ' . $data['job']->getId() . ': ' . str_replace("\n", "\nJob " . $data['job']->getId() . ': ', $newErrorOutput));
359
                }
360
            }
361
362
            // Check whether this process exceeds the maximum runtime, and terminate if that is
363
            // the case.
364
            $runtime = time() - $data['job']->getStartedAt()->getTimestamp();
365
            /** @noinspection NotOptimalIfConditionsInspection */
366
            if ($data['job']->getMaxRuntime() > 0 && $runtime > $data['job']->getMaxRuntime()) {
367
                $data['process']->stop(5);
368
369
                $this->output->writeln($data['job'] . ' terminated; maximum runtime exceeded.');
370
                $this->jobManager->closeJob($data['job'], JobInterface::STATE_TERMINATED);
371
                unset($this->runningJobs[$i]);
372
373
                continue;
374
            }
375
376
            if ($data['process']->isRunning()) {
377
                // For long running processes, it is nice to update the output status regularly.
378
                $data['job']->addOutput($newOutput);
379
                $data['job']->addErrorOutput($newErrorOutput);
380
                $data['job']->setCheckedAt(new \DateTime());
381
382
                $this->entityManager->persist($data['job']);
383
                $this->entityManager->flush($data['job']);
384
385
                continue;
386
            }
387
388
            $this->output->writeln($data['job'] . ' finished with exit code ' . $data['process']->getExitCode() . '.');
389
390
            // If the Job exited with an exception, let's reload it so that we
391
            // get access to the stack trace. This might be useful for listeners.
392
            $this->entityManager->refresh($data['job']);
393
394
            $data['job']->setExitCode($data['process']->getExitCode());
395
            $data['job']->setOutput($data['process']->getOutput());
396
            $data['job']->setErrorOutput($data['process']->getErrorOutput());
397
            $data['job']->setRuntime(time() - $data['start_time']);
398
399
            $newState = 0 === $data['process']->getExitCode() ? JobInterface::STATE_FINISHED : JobInterface::STATE_FAILED;
400
            $this->jobManager->closeJob($data['job'], $newState);
401
            unset($this->runningJobs[$i]);
402
        }
403
        unset($data);
404
405
        gc_collect_cycles();
406
    }
407
408
    /**
409
     * @param JobInterface $job
410
     */
411
    private function startJob(JobInterface $job): void
412
    {
413
        $event = new StateChangeEvent($job, JobInterface::STATE_RUNNING);
414
        $this->eventDispatcher->dispatch(SetonoSyliusSchedulerPluginEvent::JOB_STATE_CHANGED, $event);
415
        $newState = $event->getNewState();
416
417
        if (JobInterface::STATE_CANCELED === $newState) {
418
            $this->jobManager->closeJob($job, JobInterface::STATE_CANCELED);
419
420
            return;
421
        }
422
423
        if (JobInterface::STATE_RUNNING !== $newState) {
424
            throw new \LogicException(sprintf('Unsupported new state "%s".', $newState));
425
        }
426
427
        $job->setState(JobInterface::STATE_RUNNING);
428
429
        $this->entityManager->persist($job);
430
        $this->entityManager->flush($job);
431
432
        $args = $this->getBasicCommandLineArgs();
433
        $args[] = $job->getCommand();
434
        // $args[] = '--jms-job-id=' . $job->getId();
435
436
        foreach ($job->getArgs() as $arg) {
437
            $args[] = $arg;
438
        }
439
440
        $process = new Process($args);
441
        $process->start();
442
        $this->output->writeln(sprintf(
443
            'Started %s.',
444
            (string) $job
445
        ));
446
447
        $this->runningJobs[] = [
448
            'process' => $process,
449
            'job' => $job,
450
            'start_time' => time(),
451
            'output_pointer' => 0,
452
            'error_output_pointer' => 0,
453
        ];
454
    }
455
456
    /**
457
     * Cleans up stale jobs.
458
     *
459
     * A stale job is a job where this command has exited with an error
460
     * condition. Although this command is very robust, there might be cases
461
     * where it might be terminated abruptly (like a PHP segfault, a SIGTERM signal, etc.).
462
     *
463
     * In such an error condition, these jobs are cleaned-up on restart of this command.
464
     *
465
     * @param string $workerName
466
     */
467
    private function cleanUpStaleJobs(string $workerName): void
468
    {
469
        /** @var JobInterface[] $staleJobs */
470
        $staleJobs = $this->jobRepository->findStale($workerName);
471
        foreach ($staleJobs as $job) {
472
            // If the original job has retry jobs, then one of them is still in
473
            // running state. We can skip the original job here as it will be
474
            // processed automatically once the retry job is processed.
475
            if (!$job->isRetryJob() && count($job->getRetryJobs()) > 0) {
476
                continue;
477
            }
478
479
            $args = $this->getBasicCommandLineArgs();
480
            $args[] = 'setono:scheduler:mark-incomplete';
481
            $args[] = $job->getId();
482
483
            // We use a separate process to clean up.
484
            $process = new Process($args);
485
            if (0 !== $process->run()) {
486
                $ex = new ProcessFailedException($process);
487
488
                $this->output->writeln(sprintf(
489
                    'There was an error when marking %s as incomplete: %s',
490
                    $job,
491
                    $ex->getMessage()
492
                ));
493
            }
494
        }
495
    }
496
497
    /**
498
     * @return array
499
     */
500
    private function getBasicCommandLineArgs(): array
501
    {
502
        $args = [
503
            PHP_BINARY,
504
            $_SERVER['SYMFONY_CONSOLE_FILE'] ?? $_SERVER['argv'][0],
505
            '--env=' . $this->env,
506
        ];
507
508
        if ($this->verbose) {
509
            $args[] = '--verbose';
510
        }
511
512
        return $args;
513
    }
514
}
515