ScheduleCommand::execute()   B
last analyzed

Complexity

Conditions 8
Paths 12

Size

Total Lines 43

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 8.4551

Importance

Changes 0
Metric Value
dl 0
loc 43
ccs 21
cts 26
cp 0.8077
rs 7.9875
c 0
b 0
f 0
cc 8
nc 12
nop 2
crap 8.4551
1
<?php
2
3
namespace JMS\JobQueueBundle\Command;
4
5
use Doctrine\ORM\EntityManager;
6
use Doctrine\ORM\Query;
7
use Doctrine\Persistence\ManagerRegistry as PersistenceManagerRegistry;
8
use JMS\JobQueueBundle\Console\CronCommand;
9
use JMS\JobQueueBundle\Cron\CommandScheduler;
10
use JMS\JobQueueBundle\Cron\JobScheduler;
11
use JMS\JobQueueBundle\Entity\CronJob;
12
use JMS\JobQueueBundle\Entity\Job;
13
use Symfony\Component\Console\Command\Command;
14
use Symfony\Component\Console\Input\InputInterface;
15
use Symfony\Component\Console\Input\InputOption;
16
use Symfony\Component\Console\Output\OutputInterface;
17
18
class ScheduleCommand extends Command
19
{
20
    protected static $defaultName = 'jms-job-queue:schedule';
21
22
    private $registry;
23
    private $schedulers;
24
    private $cronCommands;
25
26 2
    public function __construct(PersistenceManagerRegistry $managerRegistry, iterable $schedulers, iterable $cronCommands)
27
    {
28 2
        $this->registry = $managerRegistry;
29 2
        $this->schedulers = $schedulers;
30 2
        $this->cronCommands = $cronCommands;
31
32 2
        parent::__construct();
33 2
    }
34
35 2
    protected function configure()
36
    {
37
        $this
38 2
            ->setDescription('Schedules jobs at defined intervals')
39 2
            ->addOption('max-runtime', null, InputOption::VALUE_REQUIRED, 'The maximum runtime of this command.', 3600)
40 2
            ->addOption('min-job-interval', null, InputOption::VALUE_REQUIRED, 'The minimum time between schedules jobs in seconds.', 5);
41 2
    }
42
43 2
    protected function execute(InputInterface $input, OutputInterface $output): int
44
    {
45 2
        $maxRuntime = $input->getOption('max-runtime');
46 2
        if ($maxRuntime > 300) {
47
            $maxRuntime += random_int(0, (int)($input->getOption('max-runtime') * 0.05));
48
        }
49 2
        if ($maxRuntime <= 0) {
50
            throw new \RuntimeException('Max. runtime must be greater than zero.');
51
        }
52
53 2
        $minJobInterval = (int)$input->getOption('min-job-interval');
54 2
        if ($minJobInterval <= 0) {
55
            throw new \RuntimeException('Min. job interval must be greater than zero.');
56
        }
57
58 2
        $jobSchedulers = $this->populateJobSchedulers();
59 2
        if (empty($jobSchedulers)) {
60
            $output->writeln('<info>No job schedulers found, exiting...</info>');
61
62
            return Command::SUCCESS;
63
        }
64
65 2
        $jobsLastRunAt = $this->populateJobsLastRunAt($this->registry->getManagerForClass(CronJob::class), $jobSchedulers);
0 ignored issues
show
Documentation introduced by
$this->registry->getMana...\Entity\CronJob::class) is of type object<Doctrine\Persistence\ObjectManager>|null, but the function expects a object<Doctrine\ORM\EntityManager>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
66
67 2
        $startedAt = time();
68 2
        while (true) {
69 2
            $lastRunAt = microtime(true);
70 2
            $now = time();
71 2
            if ($now - $startedAt > $maxRuntime) {
72 2
                $output->writeln('<info>Max. runtime reached, exiting...</info>');
73 2
                break;
74
            }
75
76 2
            $this->scheduleJobs($output, $jobSchedulers, $jobsLastRunAt);
77
78 2
            $timeToWait = microtime(true) - $lastRunAt + $minJobInterval;
79 2
            if ($timeToWait > 0) {
80 2
                usleep($timeToWait * 1E6);
81
            }
82
        }
83
84 2
        return Command::SUCCESS;
85
    }
86
87
    /**
88
     * @param JobScheduler[] $jobSchedulers
89
     * @param \DateTime[] $jobsLastRunAt
90
     */
91 2
    private function scheduleJobs(OutputInterface $output, array $jobSchedulers, array &$jobsLastRunAt)
92
    {
93 2
        foreach ($jobSchedulers as $name => $scheduler) {
94 2
            $lastRunAt = $jobsLastRunAt[$name];
95
96 2
            if (!$scheduler->shouldSchedule($name, $lastRunAt)) {
97 2
                continue;
98
            }
99
100 2
            list($success, $newLastRunAt) = $this->acquireLock($name, $lastRunAt);
101 2
            $jobsLastRunAt[$name] = $newLastRunAt;
102
103 2
            if ($success) {
104 2
                $output->writeln('Scheduling command ' . $name);
105 2
                $job = $scheduler->createJob($name, $lastRunAt);
106 2
                $em = $this->registry->getManagerForClass(Job::class);
107 2
                $em->persist($job);
108 2
                $em->flush($job);
109
            }
110
        }
111 2
    }
112
113 2
    private function acquireLock($commandName, \DateTime $lastRunAt)
114
    {
115
        /** @var EntityManager $em */
116 2
        $em = $this->registry->getManagerForClass(CronJob::class);
117 2
        $con = $em->getConnection();
118
119
        //* Issue #178 resolved
120 2
        if (!$con->isConnected()) {
121
            $con->close();
122
            $con->connect();
123
        }
124
125 2
        $now = new \DateTime();
126 2
        $affectedRows = $con->executeStatement(
127 2
            "UPDATE jms_cron_jobs SET lastRunAt = :now WHERE command = :command AND lastRunAt = :lastRunAt",
128
            array(
129 2
                'now' => $now,
130 2
                'command' => $commandName,
131 2
                'lastRunAt' => $lastRunAt,
132
            ),
133
            array(
134 2
                'now' => 'datetime',
135
                'lastRunAt' => 'datetime',
136
            )
137
        );
138
139 2
        if ($affectedRows > 0) {
140 2
            return array(true, $now);
141
        }
142
143
        /** @var CronJob $cronJob */
144
        $cronJob = $em->createQuery("SELECT j FROM " . CronJob::class . " j WHERE j.command = :command")
145
            ->setParameter('command', $commandName)
146
            ->setHint(Query::HINT_REFRESH, true)
147
            ->getSingleResult();
148
149
        return array(false, $cronJob->getLastRunAt());
150
    }
151
152 2
    private function populateJobSchedulers()
153
    {
154 2
        $schedulers = [];
155 2
        foreach ($this->schedulers as $scheduler) {
156
            /** @var JobScheduler $scheduler */
157
            foreach ($scheduler->getCommands() as $name) {
158
                $schedulers[$name] = $scheduler;
159
            }
160
        }
161
162 2
        foreach ($this->cronCommands as $command) {
163
            /** @var CronCommand $command */
164 2
            if (!$command instanceof Command) {
165
                throw new \RuntimeException('CronCommand should only be used on Symfony commands.');
166
            }
167
168 2
            $schedulers[$command->getName()] = new CommandScheduler($command->getName(), $command);
169
        }
170
171 2
        return $schedulers;
172
    }
173
174 2
    private function populateJobsLastRunAt(EntityManager $em, array $jobSchedulers)
175
    {
176 2
        $jobsLastRunAt = array();
177
178 2
        foreach ($em->getRepository(CronJob::class)->findAll() as $job) {
179
            /** @var CronJob $job */
180
            $jobsLastRunAt[$job->getCommand()] = $job->getLastRunAt();
181
        }
182
183 2
        foreach (array_keys($jobSchedulers) as $name) {
184 2
            if (!isset($jobsLastRunAt[$name])) {
185 2
                $job = new CronJob($name);
186 2
                $em->persist($job);
187 2
                $jobsLastRunAt[$name] = $job->getLastRunAt();
188
            }
189
        }
190 2
        $em->flush();
191
192 2
        return $jobsLastRunAt;
193
    }
194
}
195