CleanUpCommand::closeStaleJobs()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 4
nc 3
nop 0
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Setono\SyliusSchedulerPlugin\Command;
6
7
use Doctrine\ORM\EntityManager;
8
use Setono\SyliusSchedulerPlugin\Doctrine\ORM\JobRepository;
9
use Setono\SyliusSchedulerPlugin\JobManager\JobManager;
10
use Setono\SyliusSchedulerPlugin\Model\JobInterface;
11
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
12
use Symfony\Component\Console\Input\InputInterface;
13
use Symfony\Component\Console\Input\InputOption;
14
use Symfony\Component\Console\Output\OutputInterface;
15
16
class CleanUpCommand extends ContainerAwareCommand
0 ignored issues
show
Deprecated Code introduced by
The class Symfony\Bundle\Framework...d\ContainerAwareCommand has been deprecated: since Symfony 4.2, use {@see Command} instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

16
class CleanUpCommand extends /** @scrutinizer ignore-deprecated */ ContainerAwareCommand
Loading history...
17
{
18
    protected static $defaultName = 'setono:scheduler:clean-up';
19
20
    /**
21
     * @var JobManager
22
     */
23
    private $jobManager;
24
25
    /**
26
     * @var JobRepository
27
     */
28
    private $jobRepository;
29
30
    /**
31
     * @var EntityManager
32
     */
33
    private $entityManager;
34
35
    /**
36
     * @param JobManager $jobManager
37
     * @param JobRepository $jobRepository
38
     * @param EntityManager $entityManager
39
     */
40
    public function __construct(
41
        JobManager $jobManager,
42
        JobRepository $jobRepository,
43
        EntityManager $entityManager
44
    ) {
45
        parent::__construct();
46
47
        $this->jobManager = $jobManager;
48
        $this->jobRepository = $jobRepository;
49
        $this->entityManager = $entityManager;
50
    }
51
52
    /**
53
     * @noinspection ReturnTypeCanBeDeclaredInspection
54
     */
55
    protected function configure()
56
    {
57
        $this
58
            ->setDescription('Cleans up jobs which exceed the maximum retention time.')
59
            ->addOption('max-retention', null, InputOption::VALUE_REQUIRED, 'The maximum retention time (value must be parsable by DateTime).', '7 days')
60
            ->addOption('max-retention-succeeded', null, InputOption::VALUE_REQUIRED, 'The maximum retention time for succeeded jobs (value must be parsable by DateTime).', '1 hour')
61
            ->addOption('per-call', null, InputOption::VALUE_REQUIRED, 'The maximum number of jobs to clean-up per call.', '1000')
62
        ;
63
    }
64
65
    /**
66
     * {@inheritdoc}
67
     */
68
    protected function execute(InputInterface $input, OutputInterface $output)
69
    {
70
        $this->removeExpiredJobs($input);
71
        $this->closeStaleJobs();
72
    }
73
74
    /**
75
     * {@inheritdoc}
76
     */
77
    private function closeStaleJobs(): void
78
    {
79
        foreach ($this->findStaleJobs() as $job) {
80
            if ($job->isRetried()) {
81
                continue;
82
            }
83
84
            $this->jobManager->closeJob($job, JobInterface::STATE_INCOMPLETE);
85
        }
86
    }
87
88
    /**
89
     * @return \Generator|JobInterface[]
90
     */
91
    private function findStaleJobs(): \Generator
92
    {
93
        $excludedIds = [-1];
94
95
        do {
96
            $this->entityManager->clear();
97
98
            /** @var JobInterface $job */
99
            $job = $this->jobRepository->findOneStale($excludedIds);
100
101
            if ($job !== null) {
102
                $excludedIds[] = $job->getId();
103
104
                yield $job;
105
            }
106
        } while ($job !== null);
107
    }
108
109
    /**
110
     * @param InputInterface $input
111
     */
112
    private function removeExpiredJobs(InputInterface $input): void
113
    {
114
        $perCall = (int) $input->getOption('per-call');
115
116
        $connection = $this->entityManager->getConnection();
117
        $incomingDepsSql = $connection->getDatabasePlatform()
118
            ->modifyLimitQuery('SELECT 1 FROM setono_sylius_scheduler_job_dependencies WHERE destination_job_id = :id', 1)
119
        ;
120
121
        $count = 0;
122
        foreach ($this->findExpiredJobs($input) as $job) {
123
            ++$count;
124
125
            $result = $connection->executeQuery($incomingDepsSql, ['id' => $job->getId()]);
126
            if ($result->fetchColumn() !== false) {
127
                $this->entityManager->transactional(function () use ($job) {
128
                    $this->resolveDependencies($job);
129
                    $this->entityManager->remove($job);
130
                });
131
132
                continue;
133
            }
134
135
            $this->entityManager->remove($job);
136
137
            if ($count >= $perCall) {
138
                break;
139
            }
140
        }
141
142
        $this->entityManager->flush();
143
    }
144
145
    /**
146
     * @param JobInterface $job
147
     */
148
    private function resolveDependencies(JobInterface $job): void
149
    {
150
        // If this job has failed, or has otherwise not succeeded, we need to set the
151
        // incoming dependencies to failed if that has not been done already.
152
        if (!$job->isFinished()) {
153
            /** @var JobInterface $incomingDependency */
154
            foreach ($this->jobRepository->findIncomingDependencies($job) as $incomingDependency) {
155
                if ($incomingDependency->isInFinalState()) {
156
                    continue;
157
                }
158
159
                $finalState = JobInterface::STATE_CANCELED;
160
                /** @noinspection DisconnectedForeachInstructionInspection */
161
                if ($job->isRunning()) {
162
                    $finalState = JobInterface::STATE_FAILED;
163
                }
164
165
                $this->jobManager->closeJob($incomingDependency, $finalState);
166
            }
167
        }
168
169
        $this->entityManager->getConnection()
170
            ->executeUpdate(
171
                'DELETE FROM setono_sylius_scheduler_job_dependencies WHERE destination_job_id = :id',
172
                ['id' => $job->getId()]
173
            );
174
    }
175
176
    /**
177
     * @param InputInterface $input
178
     *
179
     * @return \Generator|JobInterface[]
180
     */
181
    private function findExpiredJobs(InputInterface $input): \Generator
182
    {
183
        $succeededJobs = function (array $excludedIds) use ($input) {
184
            return $this->jobRepository->findSucceededBefore(
185
                new \DateTime(sprintf(
186
                    '-%s',
187
                    $input->getOption('max-retention-succeeded')
0 ignored issues
show
Bug introduced by
It seems like $input->getOption('max-retention-succeeded') can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

187
                    /** @scrutinizer ignore-type */ $input->getOption('max-retention-succeeded')
Loading history...
188
                )),
189
                $excludedIds
190
            );
191
        };
192
        foreach ($this->whileResults($succeededJobs) as $job) {
193
            yield $job;
194
        }
195
196
        $finishedJobs = function (array $excludedIds) use ($input) {
197
            return $this->jobRepository->findFinishedBefore(
198
                new \DateTime(sprintf(
199
                    '-%s',
200
                    $input->getOption('max-retention')
0 ignored issues
show
Bug introduced by
It seems like $input->getOption('max-retention') can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

200
                    /** @scrutinizer ignore-type */ $input->getOption('max-retention')
Loading history...
201
                )),
202
                $excludedIds
203
            );
204
        };
205
        foreach ($this->whileResults($finishedJobs) as $job) {
206
            yield $job;
207
        }
208
209
        $canceledJobs = function (array $excludedIds) use ($input) {
210
            return $this->jobRepository->findCancelledBefore(
211
                new \DateTime(sprintf(
212
                    '-%s',
213
                    $input->getOption('max-retention')
0 ignored issues
show
Bug introduced by
It seems like $input->getOption('max-retention') can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

213
                    /** @scrutinizer ignore-type */ $input->getOption('max-retention')
Loading history...
214
                )),
215
                $excludedIds
216
            );
217
        };
218
        foreach ($this->whileResults($canceledJobs) as $job) {
219
            yield $job;
220
        }
221
    }
222
223
    /**
224
     * @param callable $resultProducer
225
     *
226
     * @return \Generator|JobInterface[]
227
     */
228
    private function whileResults(callable $resultProducer): \Generator
229
    {
230
        $excludedIds = [-1];
231
        do {
232
            /** @var JobInterface[] $jobs */
233
            $jobs = $resultProducer($excludedIds);
234
            foreach ($jobs as $job) {
235
                $excludedIds[] = $job->getId();
236
                yield $job;
237
            }
238
        } while (!empty($jobs));
239
    }
240
}
241