1 | <?php |
||||||
2 | |||||||
3 | namespace Dtc\QueueBundle\Doctrine; |
||||||
4 | |||||||
5 | use Dtc\QueueBundle\Model\BaseJob; |
||||||
6 | use Dtc\QueueBundle\Model\StallableJob; |
||||||
7 | use Dtc\QueueBundle\Util\Util; |
||||||
8 | |||||||
9 | trait StalledTrait |
||||||
10 | { |
||||||
11 | /** |
||||||
12 | * @param int $if |
||||||
13 | * @param int $count |
||||||
14 | * @param int $saveCount |
||||||
15 | * |
||||||
16 | * @return int |
||||||
17 | */ |
||||||
18 | 2 | protected function runStalledLoop($i, $count, $saveCount, array $stalledJobs) |
|||||
19 | { |
||||||
20 | 2 | $resetCount = 0; |
|||||
21 | 2 | for ($j = $i, $max = $i + $saveCount; $j < $max && $j < $count; ++$j) { |
|||||
22 | /* StallableJob $job */ |
||||||
23 | 2 | $job = $stalledJobs[$j]; |
|||||
24 | 2 | $job->setStatus(StallableJob::STATUS_STALLED); |
|||||
25 | 2 | if ($this->saveHistory($job)) { |
|||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||||
26 | 2 | ++$resetCount; |
|||||
27 | } |
||||||
28 | } |
||||||
29 | |||||||
30 | 2 | return $resetCount; |
|||||
31 | } |
||||||
32 | |||||||
33 | /** |
||||||
34 | * @param int $limit |
||||||
35 | * @param int $offset |
||||||
36 | */ |
||||||
37 | 2 | private function resetJobsByCriterion( |
|||||
38 | array $criterion, |
||||||
39 | $limit, |
||||||
40 | $offset |
||||||
41 | ) { |
||||||
42 | 2 | $objectManager = $this->getObjectManager(); |
|||||
0 ignored issues
–
show
It seems like
getObjectManager() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
43 | 2 | $objectName = $this->getJobClass(); |
|||||
0 ignored issues
–
show
It seems like
getJobClass() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
44 | 2 | $archiveObjectName = $this->getJobArchiveClass(); |
|||||
0 ignored issues
–
show
It seems like
getJobArchiveClass() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
45 | 2 | $jobRepository = $objectManager->getRepository($objectName); |
|||||
46 | 2 | $jobArchiveRepository = $objectManager->getRepository($archiveObjectName); |
|||||
47 | 2 | $className = $jobRepository->getClassName(); |
|||||
48 | 2 | $metadata = $objectManager->getClassMetadata($className); |
|||||
49 | 2 | $this->stopIdGenerator($objectName); |
|||||
0 ignored issues
–
show
It seems like
stopIdGenerator() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
50 | 2 | $identifierData = $metadata->getIdentifier(); |
|||||
51 | 2 | $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id'; |
|||||
52 | 2 | $results = $jobArchiveRepository->findBy($criterion, [$idColumn => 'ASC'], $limit, $offset); |
|||||
53 | 2 | $countProcessed = 0; |
|||||
54 | |||||||
55 | 2 | foreach ($results as $jobArchive) { |
|||||
56 | 2 | $countProcessed += $this->resetArchiveJob($jobArchive); |
|||||
57 | } |
||||||
58 | 2 | $objectManager->flush(); |
|||||
59 | |||||||
60 | 2 | $this->restoreIdGenerator($objectName); |
|||||
0 ignored issues
–
show
It seems like
restoreIdGenerator() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
61 | |||||||
62 | 2 | return $countProcessed; |
|||||
63 | } |
||||||
64 | |||||||
65 | 4 | protected function getStalledJobs($workerName = null, $method = null) |
|||||
66 | { |
||||||
67 | 4 | $count = $this->countJobsByStatus($this->getJobClass(), BaseJob::STATUS_RUNNING, $workerName, $method); |
|||||
0 ignored issues
–
show
It seems like
countJobsByStatus() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
68 | |||||||
69 | 4 | $criterion = ['status' => BaseJob::STATUS_RUNNING]; |
|||||
70 | 4 | $this->addWorkerNameMethod($criterion, $workerName, $method); |
|||||
0 ignored issues
–
show
It seems like
addWorkerNameMethod() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
71 | |||||||
72 | 4 | $runningJobs = $this->findRunningJobs($criterion, $count); |
|||||
73 | |||||||
74 | 4 | return $this->extractStalledJobs($runningJobs); |
|||||
75 | } |
||||||
76 | |||||||
77 | /** |
||||||
78 | * @return int Number of jobs reset |
||||||
79 | */ |
||||||
80 | 2 | protected function resetArchiveJob(StallableJob $jobArchive) |
|||||
81 | { |
||||||
82 | 2 | $objectManager = $this->getObjectManager(); |
|||||
83 | 2 | if ($this->updateMaxStatus($jobArchive, StallableJob::STATUS_MAX_RETRIES, $jobArchive->getMaxRetries(), $jobArchive->getRetries())) { |
|||||
0 ignored issues
–
show
It seems like
updateMaxStatus() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
84 | 2 | $objectManager->persist($jobArchive); |
|||||
85 | |||||||
86 | 2 | return 0; |
|||||
87 | } |
||||||
88 | |||||||
89 | /** @var StallableJob $job */ |
||||||
90 | 2 | $className = $this->getJobClass(); |
|||||
91 | 2 | $newJob = new $className(); |
|||||
92 | 2 | Util::copy($jobArchive, $newJob); |
|||||
93 | 2 | $this->resetJob($newJob); |
|||||
0 ignored issues
–
show
The method
resetJob() does not exist on Dtc\QueueBundle\Doctrine\StalledTrait . Did you maybe mean resetJobsByCriterion() ?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||||
94 | 2 | $objectManager->remove($jobArchive); |
|||||
95 | 2 | $this->flush(); |
|||||
0 ignored issues
–
show
It seems like
flush() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
96 | |||||||
97 | 2 | return 1; |
|||||
98 | } |
||||||
99 | |||||||
100 | 4 | protected function findRunningJobs($criterion, $count) |
|||||
101 | { |
||||||
102 | 4 | $repository = $this->getRepository(); |
|||||
0 ignored issues
–
show
It seems like
getRepository() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
103 | 4 | $runningJobsById = []; |
|||||
104 | |||||||
105 | 4 | $metadata = $this->getObjectManager()->getClassMetadata($this->getJobClass()); |
|||||
106 | 4 | $identifierData = $metadata->getIdentifier(); |
|||||
107 | 4 | $idColumn = isset($identifierData[0]) ? $identifierData[0] : 'id'; |
|||||
108 | |||||||
109 | 4 | $fetchCount = $this->getFetchCount($count); |
|||||
0 ignored issues
–
show
It seems like
getFetchCount() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
110 | 4 | for ($i = 0; $i < $count; $i += $fetchCount) { |
|||||
111 | 4 | $runningJobs = $repository->findBy($criterion, [$idColumn => 'ASC'], $fetchCount, $i); |
|||||
112 | 4 | if (!empty($runningJobs)) { |
|||||
113 | 4 | foreach ($runningJobs as $job) { |
|||||
114 | /** @var StallableJob $job */ |
||||||
115 | 4 | $runId = $job->getRunId(); |
|||||
116 | 4 | $runningJobsById[$runId][] = $job; |
|||||
117 | } |
||||||
118 | } |
||||||
119 | } |
||||||
120 | |||||||
121 | 4 | return $runningJobsById; |
|||||
122 | } |
||||||
123 | |||||||
124 | 4 | private function extractStalledJobsRunArchive(array $runningJobsById, array &$stalledJobs, $runId) |
|||||
125 | { |
||||||
126 | 4 | $runManager = $this->getRunManager(); |
|||||
0 ignored issues
–
show
It seems like
getRunManager() must be provided by classes using this trait. How about adding it as abstract method to this trait?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
127 | 4 | if (!method_exists($runManager, 'getObjectManager')) { |
|||||
128 | return; |
||||||
129 | } |
||||||
130 | 4 | if (!method_exists($runManager, 'getRunArchiveClass')) { |
|||||
131 | return; |
||||||
132 | } |
||||||
133 | |||||||
134 | /** @var EntityRepository|DocumentRepository $runArchiveRepository */ |
||||||
135 | 4 | $runArchiveRepository = $runManager->getObjectManager()->getRepository($runManager->getRunArchiveClass()); |
|||||
136 | /** @var Run $run */ |
||||||
137 | 4 | if ($run = $runArchiveRepository->find($runId)) { |
|||||
138 | 4 | if ($endTime = $run->getEndedAt()) { |
|||||
139 | // Did it end over an hour ago |
||||||
140 | 4 | if ((time() - $endTime->getTimestamp()) > static::STALLED_SECONDS) { |
|||||
0 ignored issues
–
show
|
|||||||
141 | 4 | $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]); |
|||||
142 | } |
||||||
143 | } |
||||||
144 | } |
||||||
145 | 4 | } |
|||||
146 | |||||||
147 | /** |
||||||
148 | * @return array |
||||||
149 | */ |
||||||
150 | 4 | private function extractStalledJobs(array $runningJobsById) |
|||||
151 | { |
||||||
152 | 4 | $stalledJobs = []; |
|||||
153 | 4 | foreach (array_keys($runningJobsById) as $runId) { |
|||||
154 | 4 | if (!$runId && 0 !== $runId) { |
|||||
155 | 2 | $stalledJobs = array_merge($stalledJobs, $runningJobsById[$runId]); |
|||||
156 | 2 | continue; |
|||||
157 | } |
||||||
158 | 4 | $this->extractStalledLiveRuns($runId, $runningJobsById[$runId], $stalledJobs); |
|||||
159 | 4 | $this->extractStalledJobsRunArchive($runningJobsById, $stalledJobs, $runId); |
|||||
160 | } |
||||||
161 | |||||||
162 | 4 | return $stalledJobs; |
|||||
163 | } |
||||||
164 | |||||||
165 | /** |
||||||
166 | * @param $runId |
||||||
167 | */ |
||||||
168 | 4 | private function extractStalledLiveRuns($runId, array $jobs, array &$stalledJobs) |
|||||
169 | { |
||||||
170 | 4 | $objectManager = $this->getObjectManager(); |
|||||
171 | 4 | $runRepository = $objectManager->getRepository($this->getRunManager()->getRunClass()); |
|||||
172 | 4 | if ($run = $runRepository->find($runId)) { |
|||||
173 | 2 | foreach ($jobs as $job) { |
|||||
174 | 2 | if ($run->getCurrentJobId() == $job->getId()) { |
|||||
175 | 2 | continue; |
|||||
176 | } |
||||||
177 | 2 | $stalledJobs[] = $job; |
|||||
178 | } |
||||||
179 | } |
||||||
180 | 4 | } |
|||||
181 | } |
||||||
182 |