JobManager::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nc 1
nop 5
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Setono\SyliusSchedulerPlugin\JobManager;
6
7
use Doctrine\ORM\EntityManager;
8
use Setono\SyliusSchedulerPlugin\Doctrine\ORM\JobRepositoryInterface;
9
use Setono\SyliusSchedulerPlugin\Event\StateChangeEvent;
10
use Setono\SyliusSchedulerPlugin\Factory\JobFactoryInterface;
11
use Setono\SyliusSchedulerPlugin\Model\JobInterface;
12
use Setono\SyliusSchedulerPlugin\Retry\RetrySchedulerInterface;
13
use Setono\SyliusSchedulerPlugin\SetonoSyliusSchedulerPluginEvent;
14
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
15
16
class JobManager
17
{
18
    /**
19
     * @var JobRepositoryInterface
20
     */
21
    private $jobRepository;
22
23
    /**
24
     * @var JobFactoryInterface
25
     */
26
    private $jobFactory;
27
28
    /**
29
     * @var EntityManager
30
     */
31
    private $entityManager;
32
33
    /**
34
     * @var EventDispatcherInterface
35
     */
36
    private $eventDispatcher;
37
38
    /**
39
     * @var RetrySchedulerInterface
40
     */
41
    private $retryScheduler;
42
43
    /**
44
     * @param JobRepositoryInterface $jobRepository
45
     * @param JobFactoryInterface $jobFactory
46
     * @param EntityManager $entityManager
47
     * @param EventDispatcherInterface $eventDispatcher
48
     * @param RetrySchedulerInterface $retryScheduler
49
     */
50
    public function __construct(
51
        JobRepositoryInterface $jobRepository,
52
        JobFactoryInterface $jobFactory,
53
        EntityManager $entityManager,
54
        EventDispatcherInterface $eventDispatcher,
55
        RetrySchedulerInterface $retryScheduler
56
    ) {
57
        $this->jobRepository = $jobRepository;
58
        $this->jobFactory = $jobFactory;
59
        $this->entityManager = $entityManager;
60
        $this->eventDispatcher = $eventDispatcher;
61
        $this->retryScheduler = $retryScheduler;
62
    }
63
64
    /**
65
     * @param JobInterface $job
66
     * @param string $finalState
67
     */
68
    public function closeJob(JobInterface $job, string $finalState): void
69
    {
70
        $this->entityManager->getConnection()->beginTransaction();
71
72
        try {
73
            $visitedJobs = [];
74
            $this->closeJobInternal($job, $finalState, $visitedJobs);
75
            $this->entityManager->flush();
76
            $this->entityManager->getConnection()->commit();
77
78
            // Clean-up entity manager to allow for garbage collection to kick in.
79
            foreach ($visitedJobs as $visitedJob) {
80
                // If the job is an original job which is now being retried, let's
81
                // not remove it just yet.
82
                if (!$visitedJob->isClosedNonSuccessful() || $visitedJob->isRetryJob()) {
83
                    continue;
84
                }
85
86
                $this->entityManager->detach($visitedJob);
87
            }
88
        } catch (\Exception $ex) {
89
            $this->entityManager->getConnection()->rollBack();
90
91
            throw $ex;
92
        }
93
    }
94
95
    /**
96
     * @param JobInterface $job
97
     * @param string $finalState
98
     * @param array $visited
99
     */
100
    private function closeJobInternal(JobInterface $job, string $finalState, array &$visited = []): void
101
    {
102
        if (\in_array($job, $visited, true)) {
103
            return;
104
        }
105
        $visited[] = $job;
106
107
        if ($job->isInFinalState()) {
108
            return;
109
        }
110
111
        if (null !== $this->eventDispatcher && ($job->isRetryJob() || 0 === count($job->getRetryJobs()))) {
112
            $event = new StateChangeEvent($job, $finalState);
113
            $this->eventDispatcher->dispatch(SetonoSyliusSchedulerPluginEvent::JOB_STATE_CHANGED, $event);
114
            $finalState = $event->getNewState();
115
        }
116
117
        switch ($finalState) {
118
            case JobInterface::STATE_CANCELED:
119
                $job->setState(JobInterface::STATE_CANCELED);
120
                $this->entityManager->persist($job);
121
122
                if ($job->isRetryJob()) {
123
                    $this->closeJobInternal($job->getOriginalJob(), JobInterface::STATE_CANCELED, $visited);
124
125
                    return;
126
                }
127
128
                foreach ($this->jobRepository->findIncomingDependencies($job) as $dep) {
129
                    $this->closeJobInternal($dep, JobInterface::STATE_CANCELED, $visited);
130
                }
131
132
                return;
133
            case JobInterface::STATE_FAILED:
134
            case JobInterface::STATE_TERMINATED:
135
            case JobInterface::STATE_INCOMPLETE:
136
                if ($job->isRetryJob()) {
137
                    $job->setState($finalState);
138
                    $this->entityManager->persist($job);
139
140
                    $this->closeJobInternal($job->getOriginalJob(), $finalState);
141
142
                    return;
143
                }
144
145
                // The original job has failed, and we are allowed to retry it.
146
                if ($job->isRetryAllowed()) {
147
                    $retryJob = $this->jobFactory->createRetryJob($job);
148
                    $retryJob->setExecuteAfter(
149
                        $this->retryScheduler->scheduleNextRetry($job)
150
                    );
151
152
                    $job->addRetryJob($retryJob);
153
                    $this->entityManager->persist($retryJob);
154
                    $this->entityManager->persist($job);
155
156
                    return;
157
                }
158
159
                $job->setState($finalState);
160
                $this->entityManager->persist($job);
161
162
                // The original job has failed, and no retries are allowed.
163
                /** @var JobInterface $dep */
164
                foreach ($this->jobRepository->findIncomingDependencies($job) as $dep) {
165
                    // This is a safe-guard to avoid blowing up if there is a database inconsistency.
166
                    if (!$dep->isPending() && !$dep->isNew()) {
167
                        continue;
168
                    }
169
170
                    $this->closeJobInternal($dep, JobInterface::STATE_CANCELED, $visited);
171
                }
172
173
                return;
174
            case JobInterface::STATE_FINISHED:
175
                if ($job->isRetryJob()) {
176
                    $job->getOriginalJob()->setState($finalState);
177
                    $this->entityManager->persist(
178
                        $job->getOriginalJob()
179
                    );
180
                }
181
                $job->setState($finalState);
182
                $this->entityManager->persist($job);
183
184
                return;
185
            default:
186
                throw new \LogicException(sprintf(
187
                    'Non allowed state "%s" in closeJobInternal().',
188
                    $finalState
189
                ));
190
        }
191
    }
192
}
193