CleanUpCommand   A
last analyzed

Complexity

Total Complexity 22

Size/Duplication

Total Lines 173
Duplicated Lines 13.29 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 22
lcom 1
cbo 9
dl 23
loc 173
ccs 0
cts 97
cp 0
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A configure() 0 8 1
A execute() 0 11 1
A collectStaleJobs() 0 10 3
A findStaleJobs() 0 24 3
A cleanUpExpiredJobs() 0 29 4
A resolveDependencies() 0 21 5
A findExpiredJobs() 23 31 1
A whileResults() 0 13 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace JMS\JobQueueBundle\Command;
4
5
use Doctrine\DBAL\Connection;
6
use Doctrine\ORM\EntityManager;
7
use Doctrine\Persistence\ManagerRegistry as PersistenceManagerRegistry;
8
use JMS\JobQueueBundle\Entity\Job;
9
use JMS\JobQueueBundle\Entity\Repository\JobManager;
10
use Symfony\Component\Console\Command\Command;
11
use Symfony\Component\Console\Input\InputInterface;
12
use Symfony\Component\Console\Input\InputOption;
13
use Symfony\Component\Console\Output\OutputInterface;
14
15
class CleanUpCommand extends Command
16
{
17
    protected static $defaultName = 'jms-job-queue:clean-up';
18
19
    private $jobManager;
20
    private $registry;
21
22
    public function __construct(PersistenceManagerRegistry $registry, JobManager $jobManager)
23
    {
24
        $this->jobManager = $jobManager;
25
        $this->registry = $registry;
26
27
        parent::__construct();
28
    }
29
30
    protected function configure()
31
    {
32
        $this
33
            ->setDescription('Cleans up jobs which exceed the maximum retention time.')
34
            ->addOption('max-retention', null, InputOption::VALUE_REQUIRED, 'The maximum retention time (value must be parsable by DateTime).', '7 days')
35
            ->addOption('max-retention-succeeded', null, InputOption::VALUE_REQUIRED, 'The maximum retention time for succeeded jobs (value must be parsable by DateTime).', '1 hour')
36
            ->addOption('per-call', null, InputOption::VALUE_REQUIRED, 'The maximum number of jobs to clean-up per call.', 1000);
37
    }
38
39
    protected function execute(InputInterface $input, OutputInterface $output): int
40
    {
41
        /** @var EntityManager $em */
42
        $em = $this->registry->getManagerForClass(Job::class);
43
        $con = $em->getConnection();
44
45
        $this->cleanUpExpiredJobs($em, $con, $input);
46
        $this->collectStaleJobs($em);
47
48
        return Command::SUCCESS;
49
    }
50
51
    private function collectStaleJobs(EntityManager $em)
52
    {
53
        foreach ($this->findStaleJobs($em) as $job) {
54
            if ($job->isRetried()) {
55
                continue;
56
            }
57
58
            $this->jobManager->closeJob($job, Job::STATE_INCOMPLETE);
59
        }
60
    }
61
62
    /**
63
     * @return Job[]
64
     */
65
    private function findStaleJobs(EntityManager $em)
66
    {
67
        $excludedIds = array(-1);
68
69
        do {
70
            $em->clear();
71
72
            /** @var Job $job */
73
            $job = $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j
74
                                      WHERE j.state = :running AND j.workerName IS NOT NULL AND j.checkedAt < :maxAge
75
                                                AND j.id NOT IN (:excludedIds)")
76
                ->setParameter('running', Job::STATE_RUNNING)
77
                ->setParameter('maxAge', new \DateTime('-5 minutes'), 'datetime')
78
                ->setParameter('excludedIds', $excludedIds)
79
                ->setMaxResults(1)
80
                ->getOneOrNullResult();
81
82
            if ($job !== null) {
83
                $excludedIds[] = $job->getId();
84
85
                yield $job;
86
            }
87
        } while ($job !== null);
88
    }
89
90
    private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInterface $input)
91
    {
92
        $incomingDepsSql = $con->getDatabasePlatform()->modifyLimitQuery("SELECT 1 FROM jms_job_dependencies WHERE dest_job_id = :id", 1);
93
94
        $count = 0;
95
        foreach ($this->findExpiredJobs($em, $input) as $job) {
96
            /** @var Job $job */
97
98
            $count++;
99
100
            $result = $con->executeQuery($incomingDepsSql, array('id' => $job->getId()));
101
            if ($result->fetchColumn() !== false) {
102
                $em->transactional(function () use ($em, $job) {
103
                    $this->resolveDependencies($em, $job);
104
                    $em->remove($job);
105
                });
106
107
                continue;
108
            }
109
110
            $em->remove($job);
111
112
            if ($count >= $input->getOption('per-call')) {
113
                break;
114
            }
115
        }
116
117
        $em->flush();
118
    }
119
120
    private function resolveDependencies(EntityManager $em, Job $job)
121
    {
122
        // If this job has failed, or has otherwise not succeeded, we need to set the
123
        // incoming dependencies to failed if that has not been done already.
124
        if (!$job->isFinished()) {
125
            foreach ($this->jobManager->findIncomingDependencies($job) as $incomingDep) {
126
                if ($incomingDep->isInFinalState()) {
127
                    continue;
128
                }
129
130
                $finalState = Job::STATE_CANCELED;
131
                if ($job->isRunning()) {
132
                    $finalState = Job::STATE_FAILED;
133
                }
134
135
                $this->jobManager->closeJob($incomingDep, $finalState);
136
            }
137
        }
138
139
        $em->getConnection()->executeStatement("DELETE FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId()));
140
    }
141
142
    private function findExpiredJobs(EntityManager $em, InputInterface $input)
143
    {
144 View Code Duplication
        $succeededJobs = function (array $excludedIds) use ($em, $input) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
145
            return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.state = :succeeded AND j.id NOT IN (:excludedIds)")
146
                ->setParameter('maxRetentionTime', new \DateTime('-' . $input->getOption('max-retention-succeeded')))
147
                ->setParameter('excludedIds', $excludedIds)
148
                ->setParameter('succeeded', Job::STATE_FINISHED)
149
                ->setMaxResults(100)
150
                ->getResult();
151
        };
152
        yield from $this->whileResults($succeededJobs);
153
154 View Code Duplication
        $finishedJobs = function (array $excludedIds) use ($em, $input) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
155
            return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
156
                ->setParameter('maxRetentionTime', new \DateTime('-' . $input->getOption('max-retention')))
157
                ->setParameter('excludedIds', $excludedIds)
158
                ->setMaxResults(100)
159
                ->getResult();
160
        };
161
        yield from $this->whileResults($finishedJobs);
162
163 View Code Duplication
        $canceledJobs = function (array $excludedIds) use ($em, $input) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
164
            return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
165
                ->setParameter('maxRetentionTime', new \DateTime('-' . $input->getOption('max-retention')))
166
                ->setParameter('canceled', Job::STATE_CANCELED)
167
                ->setParameter('excludedIds', $excludedIds)
168
                ->setMaxResults(100)
169
                ->getResult();
170
        };
171
        yield from $this->whileResults($canceledJobs);
172
    }
173
174
    private function whileResults(callable $resultProducer)
175
    {
176
        $excludedIds = array(-1);
177
178
        do {
179
            /** @var Job[] $jobs */
180
            $jobs = $resultProducer($excludedIds);
181
            foreach ($jobs as $job) {
182
                $excludedIds[] = $job->getId();
183
                yield $job;
184
            }
185
        } while (!empty($jobs));
186
    }
187
}
188