CleanUpCommand::cleanUpExpiredJobs()   A
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
dl 0
loc 29
ccs 0
cts 16
cp 0
rs 9.456
c 0
b 0
f 0
cc 4
nc 4
nop 3
crap 20
1
<?php
2
3
namespace JMS\JobQueueBundle\Command;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\Persistence\ManagerRegistry as PersistenceManagerRegistry;
8
use JMS\JobQueueBundle\Entity\Job;
9
use JMS\JobQueueBundle\Entity\Repository\JobManager;
10
use Symfony\Component\Console\Command\Command;
11
use Symfony\Component\Console\Input\InputInterface;
12
use Symfony\Component\Console\Input\InputOption;
13
use Symfony\Component\Console\Output\OutputInterface;
14
15
class CleanUpCommand extends Command
16
{
17
    protected static $defaultName = 'jms-job-queue:clean-up';
18
19
    private $jobManager;
20
    private $registry;
21
22
    public function __construct(PersistenceManagerRegistry $registry, JobManager $jobManager)
23
    {
24
        $this->jobManager = $jobManager;
25
        $this->registry = $registry;
26
27
        parent::__construct();
28
    }
29
30
    protected function configure()
31
    {
32
        $this
33
            ->setDescription('Cleans up jobs which exceed the maximum retention time.')
34
            ->addOption('max-retention', null, InputOption::VALUE_REQUIRED, 'The maximum retention time (value must be parsable by DateTime).', '7 days')
35
            ->addOption('max-retention-succeeded', null, InputOption::VALUE_REQUIRED, 'The maximum retention time for succeeded jobs (value must be parsable by DateTime).', '1 hour')
36
            ->addOption('per-call', null, InputOption::VALUE_REQUIRED, 'The maximum number of jobs to clean-up per call.', 1000);
37
    }
38
39
    protected function execute(InputInterface $input, OutputInterface $output): int
40
    {
41
        /** @var EntityManager $em */
42
        $em = $this->registry->getManagerForClass(Job::class);
43
        $con = $em->getConnection();
44
45
        $this->cleanUpExpiredJobs($em, $con, $input);
46
        $this->collectStaleJobs($em);
47
48
        return Command::SUCCESS;
49
    }
50
51
    private function collectStaleJobs(EntityManager $em)
52
    {
53
        foreach ($this->findStaleJobs($em) as $job) {
54
            if ($job->isRetried()) {
55
                continue;
56
            }
57
58
            $this->jobManager->closeJob($job, Job::STATE_INCOMPLETE);
59
        }
60
    }
61
62
    /**
63
     * @return Job[]
64
     */
65
    private function findStaleJobs(EntityManager $em)
66
    {
67
        $excludedIds = array(-1);
68
69
        do {
70
            $em->clear();
71
72
            /** @var Job $job */
73
            $job = $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j
74
                                      WHERE j.state = :running AND j.workerName IS NOT NULL AND j.checkedAt < :maxAge
75
                                                AND j.id NOT IN (:excludedIds)")
76
                ->setParameter('running', Job::STATE_RUNNING)
77
                ->setParameter('maxAge', new \DateTime('-5 minutes'), 'datetime')
78
                ->setParameter('excludedIds', $excludedIds)
79
                ->setMaxResults(1)
80
                ->getOneOrNullResult();
81
82
            if ($job !== null) {
83
                $excludedIds[] = $job->getId();
84
85
                yield $job;
86
            }
87
        } while ($job !== null);
88
    }
89
90
    private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInterface $input)
91
    {
92
        $incomingDepsSql = $con->getDatabasePlatform()->modifyLimitQuery("SELECT 1 FROM jms_job_dependencies WHERE dest_job_id = :id", 1);
93
94
        $count = 0;
95
        foreach ($this->findExpiredJobs($em, $input) as $job) {
96
            /** @var Job $job */
97
98
            $count++;
99
100
            $result = $con->executeQuery($incomingDepsSql, array('id' => $job->getId()));
101
            if ($result->fetchColumn() !== false) {
102
                $em->transactional(function () use ($em, $job) {
103
                    $this->resolveDependencies($em, $job);
104
                    $em->remove($job);
105
                });
106
107
                continue;
108
            }
109
110
            $em->remove($job);
111
112
            if ($count >= $input->getOption('per-call')) {
113
                break;
114
            }
115
        }
116
117
        $em->flush();
118
    }
119
120
    private function resolveDependencies(EntityManager $em, Job $job)
121
    {
122
        // If this job has failed, or has otherwise not succeeded, we need to set the
123
        // incoming dependencies to failed if that has not been done already.
124
        if (!$job->isFinished()) {
125
            foreach ($this->jobManager->findIncomingDependencies($job) as $incomingDep) {
126
                if ($incomingDep->isInFinalState()) {
127
                    continue;
128
                }
129
130
                $finalState = Job::STATE_CANCELED;
131
                if ($job->isRunning()) {
132
                    $finalState = Job::STATE_FAILED;
133
                }
134
135
                $this->jobManager->closeJob($incomingDep, $finalState);
136
            }
137
        }
138
139
        $em->getConnection()->executeStatement("DELETE FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId()));
140
    }
141
142
    private function findExpiredJobs(EntityManager $em, InputInterface $input)
143
    {
144 View Code Duplication
        $succeededJobs = function (array $excludedIds) use ($em, $input) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
145
            return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.state = :succeeded AND j.id NOT IN (:excludedIds)")
146
                ->setParameter('maxRetentionTime', new \DateTime('-' . $input->getOption('max-retention-succeeded')))
147
                ->setParameter('excludedIds', $excludedIds)
148
                ->setParameter('succeeded', Job::STATE_FINISHED)
149
                ->setMaxResults(100)
150
                ->getResult();
151
        };
152
        yield from $this->whileResults($succeededJobs);
153
154 View Code Duplication
        $finishedJobs = function (array $excludedIds) use ($em, $input) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
155
            return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
156
                ->setParameter('maxRetentionTime', new \DateTime('-' . $input->getOption('max-retention')))
157
                ->setParameter('excludedIds', $excludedIds)
158
                ->setMaxResults(100)
159
                ->getResult();
160
        };
161
        yield from $this->whileResults($finishedJobs);
162
163 View Code Duplication
        $canceledJobs = function (array $excludedIds) use ($em, $input) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
164
            return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
165
                ->setParameter('maxRetentionTime', new \DateTime('-' . $input->getOption('max-retention')))
166
                ->setParameter('canceled', Job::STATE_CANCELED)
167
                ->setParameter('excludedIds', $excludedIds)
168
                ->setMaxResults(100)
169
                ->getResult();
170
        };
171
        yield from $this->whileResults($canceledJobs);
172
    }
173
174
    private function whileResults(callable $resultProducer)
175
    {
176
        $excludedIds = array(-1);
177
178
        do {
179
            /** @var Job[] $jobs */
180
            $jobs = $resultProducer($excludedIds);
181
            foreach ($jobs as $job) {
182
                $excludedIds[] = $job->getId();
183
                yield $job;
184
            }
185
        } while (!empty($jobs));
186
    }
187
}
188