Issues (35)

Security Analysis    no request data  

This project does not seem to handle request data directly as such no vulnerable execution paths were found.

  Cross-Site Scripting
Cross-Site Scripting enables an attacker to inject code into the response of a web-request that is viewed by other users. It can for example be used to bypass access controls, or even to take over other users' accounts.
  File Exposure
File Exposure allows an attacker to gain access to local files that he should not be able to access. These files can for example include database credentials, or other configuration files.
  File Manipulation
File Manipulation enables an attacker to write custom data to files. This potentially leads to injection of arbitrary code on the server.
  Object Injection
Object Injection enables an attacker to inject an object into PHP code, and can lead to arbitrary code execution, file exposure, or file manipulation attacks.
  Code Injection
Code Injection enables an attacker to execute arbitrary code on the server.
  Response Splitting
Response Splitting can be used to send arbitrary responses.
  File Inclusion
File Inclusion enables an attacker to inject custom files into PHP's file loading mechanism, either explicitly passed to include, or for example via PHP's auto-loading mechanism.
  Command Injection
Command Injection enables an attacker to inject a shell command that is execute with the privileges of the web-server. This can be used to expose sensitive data, or gain access of your server.
  SQL Injection
SQL Injection enables an attacker to execute arbitrary SQL code on your database server gaining access to user data, or manipulating user data.
  XPath Injection
XPath Injection enables an attacker to modify the parts of XML document that are read. If that XML document is for example used for authentication, this can lead to further vulnerabilities similar to SQL Injection.
  LDAP Injection
LDAP Injection enables an attacker to inject LDAP statements potentially granting permission to run unauthorized queries, or modify content inside the LDAP tree.
  Header Injection
  Other Vulnerability
This category comprises other attack vectors such as manipulating the PHP runtime, loading custom extensions, freezing the runtime, or similar.
  Regex Injection
Regex Injection enables an attacker to execute arbitrary code in your PHP process.
  XML Injection
XML Injection enables an attacker to read files on your local filesystem including configuration files, or can be abused to freeze your web-server process.
  Variable Injection
Variable Injection enables an attacker to overwrite program variables with custom data, and can lead to further vulnerabilities.
Unfortunately, the security analysis is currently not available for your project. If you are a non-commercial open-source project, please contact support to gain access.

Command/RunCommand.php (7 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
/*
4
 * Copyright 2012 Johannes M. Schmitt <[email protected]>
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 */
18
19
namespace JMS\JobQueueBundle\Command;
20
21
use Doctrine\ORM\EntityManager;
22
use Doctrine\Persistence\ManagerRegistry as PersistenceManagerRegistry;
23
use JMS\JobQueueBundle\Entity\Job;
24
use JMS\JobQueueBundle\Entity\Repository\JobManager;
25
use JMS\JobQueueBundle\Event\NewOutputEvent;
26
use JMS\JobQueueBundle\Event\StateChangeEvent;
27
use JMS\JobQueueBundle\Exception\InvalidArgumentException;
28
use Symfony\Component\Console\Command\Command;
29
use Symfony\Component\Console\Input\InputInterface;
30
use Symfony\Component\Console\Input\InputOption;
31
use Symfony\Component\Console\Output\OutputInterface;
32
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
33
use Symfony\Component\Process\Exception\ProcessFailedException;
34
use Symfony\Component\Process\Process;
35
36
class RunCommand extends Command
37
{
38
    protected static $defaultName = 'jms-job-queue:run';
39
40
    /** @var string */
41
    private $env;
42
43
    /** @var boolean */
44
    private $verbose;
45
46
    /** @var OutputInterface */
47
    private $output;
48
49
    /** @var PersistenceManagerRegistry */
50
    private $registry;
51
52
    /** @var JobManager */
53
    private $jobManager;
54
55
    /** @var EventDispatcherInterface */
56
    private $dispatcher;
57
58
    /** @var array */
59
    private $runningJobs = array();
60
61
    /** @var bool */
62
    private $shouldShutdown = false;
63
64
    /** @var array */
65
    private $queueOptionsDefault;
66
67
    /** @var array */
68
    private $queueOptions;
69
70 26
    public function __construct(PersistenceManagerRegistry $managerRegistry, JobManager $jobManager, EventDispatcherInterface $dispatcher, array $queueOptionsDefault, array $queueOptions)
71
    {
72 26
        $this->registry = $managerRegistry;
73 26
        $this->jobManager = $jobManager;
74 26
        $this->dispatcher = $dispatcher;
75 26
        $this->queueOptionsDefault = $queueOptionsDefault;
76 26
        $this->queueOptions = $queueOptions;
77
78 26
        parent::__construct();
79 26
    }
80
81 26
    protected function configure()
82
    {
83
        $this
84 26
            ->setDescription('Runs jobs from the queue.')
85 26
            ->addOption('max-runtime', 'r', InputOption::VALUE_REQUIRED, 'The maximum runtime in seconds.', 900)
86 26
            ->addOption('max-concurrent-jobs', 'j', InputOption::VALUE_REQUIRED, 'The maximum number of concurrent jobs.', 4)
87 26
            ->addOption('idle-time', null, InputOption::VALUE_REQUIRED, 'Time to sleep when the queue ran out of jobs.', 2)
88 26
            ->addOption('queue', null, InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Restrict to one or more queues.', array())
89 26
            ->addOption('worker-name', null, InputOption::VALUE_REQUIRED, 'The name that uniquely identifies this worker process.');
90 26
    }
91
92 26
    protected function execute(InputInterface $input, OutputInterface $output): int
93
    {
94 26
        $startTime = time();
95
96 26
        $maxRuntime = (int) $input->getOption('max-runtime');
97 26
        if ($maxRuntime <= 0) {
98
            throw new InvalidArgumentException('The maximum runtime must be greater than zero.');
99
        }
100
101 26
        if ($maxRuntime > 600) {
102
            $maxRuntime += random_int(-120, 120);
103
        }
104
105 26
        $maxJobs = (int) $input->getOption('max-concurrent-jobs');
106 26
        if ($maxJobs <= 0) {
107
            throw new InvalidArgumentException('The maximum number of jobs per queue must be greater than zero.');
108
        }
109
110 26
        $idleTime = (int) $input->getOption('idle-time');
111 26
        if ($idleTime <= 0) {
112
            throw new InvalidArgumentException('Time to sleep when idling must be greater than zero.');
113
        }
114
115 26
        $restrictedQueues = $input->getOption('queue');
116
117 26
        $workerName = $input->getOption('worker-name');
118 26
        if ($workerName === null) {
119
            $workerName = gethostname() . '-' . getmypid();
120
        }
121
122 26
        if (strlen($workerName) > 50) {
123
            throw new \RuntimeException(sprintf(
124
                '"worker-name" must not be longer than 50 chars, but got "%s" (%d chars).',
125
                $workerName,
126
                strlen($workerName)
127
            ));
128
        }
129
130 26
        $this->env = $input->getOption('env');
131 26
        $this->verbose = $input->getOption('verbose');
132 26
        $this->output = $output;
133 26
        $this->getEntityManager()->getConnection()->getConfiguration()->setSQLLogger(null);
134
135 26
        if ($this->verbose) {
136 2
            $this->output->writeln('<info>Cleaning up stale jobs</info>');
137
        }
138
139 26
        $this->cleanUpStaleJobs($workerName);
140
141 26
        $this->runJobs(
142 26
            $workerName,
143
            $startTime,
144
            $maxRuntime,
145
            $idleTime,
146
            $maxJobs,
147
            $restrictedQueues,
148 26
            $this->queueOptionsDefault,
149 26
            $this->queueOptions
150
        );
151
152 26
        return Command::SUCCESS;
153
    }
154
155 26
    private function runJobs($workerName, $startTime, $maxRuntime, $idleTime, $maxJobs, array $restrictedQueues, array $queueOptionsDefaults, array $queueOptions)
156
    {
157 26
        $hasPcntl = extension_loaded('pcntl');
158
159 26
        if ($this->verbose) {
160 2
            $this->output->writeln('<info>Running jobs</info>');
161
        }
162
163 26
        if ($hasPcntl) {
164 26
            $this->setupSignalHandlers();
165 26
            if ($this->verbose) {
166 26
                $this->output->writeln('<info>Signal Handlers have been installed.</info>');
167
            }
168
        } elseif ($this->verbose) {
169
            $this->output->writeln('<error>PCNTL extension is not available. Signals cannot be processed.</error>');
170
        }
171
172 26
        while (true) {
173 26
            if ($hasPcntl) {
174 26
                pcntl_signal_dispatch();
175
            }
176
177 26
            if ($this->shouldShutdown || time() - $startTime > $maxRuntime) {
178 26
                break;
179
            }
180
181 26
            $this->checkRunningJobs();
182 26
            $this->startJobs($workerName, $idleTime, $maxJobs, $restrictedQueues, $queueOptionsDefaults, $queueOptions);
183
184 26
            $waitTimeInMs = random_int(500, 1000);
185 26
            usleep($waitTimeInMs * 1E3);
186
        }
187
188 26
        if ($this->verbose) {
189 2
            $this->output->writeln('<info>Entering shutdown sequence, waiting for running jobs to terminate...</info>');
190
        }
191
192 26
        while (!empty($this->runningJobs)) {
193 16
            sleep(5);
194 16
            $this->checkRunningJobs();
195
        }
196
197 26
        if ($this->verbose) {
198 2
            $this->output->writeln('<info>All jobs finished, exiting.</info>');
199
        }
200 26
    }
201
202 26
    private function setupSignalHandlers()
203
    {
204
        pcntl_signal(SIGTERM, function () {
205
            if ($this->verbose) {
206
                $this->output->writeln('<info>Received SIGTERM signal.</info>');
207
            }
208
209
            $this->shouldShutdown = true;
210 26
        });
211 26
    }
212
213 26
    private function startJobs($workerName, $idleTime, $maxJobs, array $restrictedQueues, array $queueOptionsDefaults, array $queueOptions)
214
    {
215 26
        $excludedIds = array();
216 26
        while (count($this->runningJobs) < $maxJobs) {
217 26
            $pendingJob = $this->jobManager->findStartableJob(
218 26
                $workerName,
219
                $excludedIds,
220 26
                $this->getExcludedQueues($queueOptionsDefaults, $queueOptions, $maxJobs),
221
                $restrictedQueues
222
            );
223
224 26
            if (null === $pendingJob) {
225 26
                sleep($idleTime);
226
227 26
                return;
228
            }
229
230 24
            $this->startJob($pendingJob);
231
        }
232
    }
233
234 26
    private function getExcludedQueues(array $queueOptionsDefaults, array $queueOptions, $maxConcurrentJobs)
235
    {
236 26
        $excludedQueues = array();
237 26
        foreach ($this->getRunningJobsPerQueue() as $queue => $count) {
238 24
            if ($count >= $this->getMaxConcurrentJobs($queue, $queueOptionsDefaults, $queueOptions, $maxConcurrentJobs)) {
239 24
                $excludedQueues[] = $queue;
240
            }
241
        }
242
243 26
        return $excludedQueues;
244
    }
245
246 24
    private function getMaxConcurrentJobs($queue, array $queueOptionsDefaults, array $queueOptions, $maxConcurrentJobs)
247
    {
248 24
        if (isset($queueOptions[$queue]['max_concurrent_jobs'])) {
249 2
            return (int) $queueOptions[$queue]['max_concurrent_jobs'];
250
        }
251
252 22
        if (isset($queueOptionsDefaults['max_concurrent_jobs'])) {
253 22
            return (int) $queueOptionsDefaults['max_concurrent_jobs'];
254
        }
255
256
        return $maxConcurrentJobs;
257
    }
258
259 26
    private function getRunningJobsPerQueue()
260
    {
261 26
        $runningJobsPerQueue = array();
262 26
        foreach ($this->runningJobs as $jobDetails) {
263
            /** @var Job $job */
264 24
            $job = $jobDetails['job'];
265
266 24
            $queue = $job->getQueue();
267 24
            if (!isset($runningJobsPerQueue[$queue])) {
268 24
                $runningJobsPerQueue[$queue] = 0;
269
            }
270 24
            $runningJobsPerQueue[$queue] += 1;
271
        }
272
273 26
        return $runningJobsPerQueue;
274
    }
275
276 26
    private function checkRunningJobs()
277
    {
278 26
        foreach ($this->runningJobs as $i => &$data) {
279 24
            $newOutput = substr($data['process']->getOutput(), $data['output_pointer']);
280 24
            $data['output_pointer'] += strlen($newOutput);
281
282 24
            $newErrorOutput = substr($data['process']->getErrorOutput(), $data['error_output_pointer']);
283 24
            $data['error_output_pointer'] += strlen($newErrorOutput);
284
285 24 View Code Duplication
            if (!empty($newOutput)) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
286 4
                $event = new NewOutputEvent($data['job'], $newOutput, NewOutputEvent::TYPE_STDOUT);
287 4
                $this->dispatcher->dispatch($event);
0 ignored issues
show
$event is of type object<JMS\JobQueueBundle\Event\NewOutputEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
288 4
                $newOutput = $event->getNewOutput();
289
            }
290
291 24 View Code Duplication
            if (!empty($newErrorOutput)) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
292 4
                $event = new NewOutputEvent($data['job'], $newErrorOutput, NewOutputEvent::TYPE_STDERR);
293 4
                $this->dispatcher->dispatch($event);
0 ignored issues
show
$event is of type object<JMS\JobQueueBundle\Event\NewOutputEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
294 4
                $newErrorOutput = $event->getNewOutput();
295
            }
296
297 24
            if ($this->verbose) {
298 2 View Code Duplication
                if (!empty($newOutput)) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
299 2
                    $this->output->writeln('Job ' . $data['job']->getId() . ': ' . str_replace("\n", "\nJob " . $data['job']->getId() . ": ", $newOutput));
300
                }
301
302 2 View Code Duplication
                if (!empty($newErrorOutput)) {
0 ignored issues
show
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
303
                    $this->output->writeln('Job ' . $data['job']->getId() . ': ' . str_replace("\n", "\nJob " . $data['job']->getId() . ": ", $newErrorOutput));
304
                }
305
            }
306
307
            // Check whether this process exceeds the maximum runtime, and terminate if that is
308
            // the case.
309 24
            $runtime = time() - $data['job']->getStartedAt()->getTimestamp();
310 24
            if ($data['job']->getMaxRuntime() > 0 && $runtime > $data['job']->getMaxRuntime()) {
311 2
                $data['process']->stop(5);
312
313 2
                $this->output->writeln(
314 2
                    sprintf('<error>%s terminated; maximum runtime exceeded.</error>', $data['job'])
315
                );
316 2
                $this->jobManager->closeJob($data['job'], Job::STATE_TERMINATED);
317 2
                unset($this->runningJobs[$i]);
318
319 2
                continue;
320
            }
321
322 22
            if ($data['process']->isRunning()) {
323
                // For long running processes, it is nice to update the output status regularly.
324 4
                $data['job']->addOutput($newOutput);
325 4
                $data['job']->addErrorOutput($newErrorOutput);
326 4
                $data['job']->checked();
327 4
                $em = $this->getEntityManager();
328 4
                $em->persist($data['job']);
329 4
                $em->flush();
330
331 4
                continue;
332
            }
333
334 22
            $this->output->writeln(
335 22
                sprintf(
336 22
                    '<info>%s finished with exit code %d.</info>',
337 22
                    $data['job'],
338 22
                    $data['process']->getExitCode()
339
                )
340
            );
341
342
            // If the Job exited with an exception, let's reload it so that we
343
            // get access to the stack trace. This might be useful for listeners.
344 22
            $this->getEntityManager()->refresh($data['job']);
345
346 22
            $data['job']->setExitCode($data['process']->getExitCode());
347 22
            $data['job']->setOutput($data['process']->getOutput());
348 22
            $data['job']->setErrorOutput($data['process']->getErrorOutput());
349 22
            $data['job']->setRuntime(time() - $data['start_time']);
350
351 22
            $newState = 0 === $data['process']->getExitCode() ? Job::STATE_FINISHED : Job::STATE_FAILED;
352 22
            $this->jobManager->closeJob($data['job'], $newState);
353 22
            unset($this->runningJobs[$i]);
354
        }
355
356 26
        gc_collect_cycles();
357 26
    }
358
359 24
    private function startJob(Job $job)
360
    {
361 24
        $event = new StateChangeEvent($job, Job::STATE_RUNNING);
362 24
        $this->dispatcher->dispatch($event);
0 ignored issues
show
$event is of type object<JMS\JobQueueBundle\Event\StateChangeEvent>, but the function expects a object<Symfony\Contracts\EventDispatcher\object>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
363 24
        $newState = $event->getNewState();
364
365 24
        if (Job::STATE_CANCELED === $newState) {
366
            $this->jobManager->closeJob($job, Job::STATE_CANCELED);
367
368
            return;
369
        }
370
371 24
        if (Job::STATE_RUNNING !== $newState) {
372
            throw new \LogicException(sprintf('Unsupported new state "%s".', $newState));
373
        }
374
375 24
        $job->setState(Job::STATE_RUNNING);
376 24
        $em = $this->getEntityManager();
377 24
        $em->persist($job);
378 24
        $em->flush();
379
380 24
        $args = $this->getBasicCommandLineArgs();
381 24
        $args[] = $job->getCommand();
382 24
        $args[] = '--jms-job-id=' . $job->getId();
383
384 24
        foreach ($job->getArgs() as $arg) {
385 6
            $args[] = $arg;
386
        }
387
388 24
        $proc = new Process($args);
389 24
        $proc->start();
390 24
        $this->output->writeln(sprintf('<info>Started %s.</info>', $job));
391
392 24
        $this->runningJobs[] = array(
393 24
            'process' => $proc,
394 24
            'job' => $job,
395 24
            'start_time' => time(),
396 24
            'output_pointer' => 0,
397 24
            'error_output_pointer' => 0,
398
        );
399 24
    }
400
401
    /**
402
     * Cleans up stale jobs.
403
     *
404
     * A stale job is a job where this command has exited with an error
405
     * condition. Although this command is very robust, there might be cases
406
     * where it might be terminated abruptly (like a PHP segfault, a SIGTERM signal, etc.).
407
     *
408
     * In such an error condition, these jobs are cleaned-up on restart of this command.
409
     */
410 26
    private function cleanUpStaleJobs($workerName)
411
    {
412
        /** @var Job[] $staleJobs */
413 26
        $staleJobs = $this->getEntityManager()->createQuery("SELECT j FROM " . Job::class . " j WHERE j.state = :running AND (j.workerName = :worker OR j.workerName IS NULL)")
414 26
            ->setParameter('worker', $workerName)
415 26
            ->setParameter('running', Job::STATE_RUNNING)
416 26
            ->getResult();
417
418 26
        foreach ($staleJobs as $job) {
419
            // If the original job has retry jobs, then one of them is still in
420
            // running state. We can skip the original job here as it will be
421
            // processed automatically once the retry job is processed.
422
            if (!$job->isRetryJob() && count($job->getRetryJobs()) > 0) {
423
                continue;
424
            }
425
426
            $args = $this->getBasicCommandLineArgs();
427
            $args[] = 'jms-job-queue:mark-incomplete';
428
            $args[] = $job->getId();
429
430
            // We use a separate process to clean up.
431
            $proc = new Process($args);
432
            if (0 !== $proc->run()) {
433
                $ex = new ProcessFailedException($proc);
434
435
                $this->output->writeln(sprintf('<error>There was an error when marking %s as incomplete: %s</error>', $job, $ex->getMessage()));
436
            }
437
        }
438 26
    }
439
440 24
    private function getBasicCommandLineArgs(): array
441
    {
442
        $args = array(
443 24
            PHP_BINARY,
444 24
            $_SERVER['SYMFONY_CONSOLE_FILE'] ?? $_SERVER['argv'][0],
445 24
            '--env=' . $this->env
446
        );
447
448 24
        if ($this->verbose) {
449 2
            $args[] = '--verbose';
450
        }
451
452 24
        return $args;
453
    }
454
455 26
    private function getEntityManager(): EntityManager
456
    {
457 26
        return $this->registry->getManagerForClass('JMSJobQueueBundle:Job');
458
    }
459
}
460