This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | /* |
||
4 | * Copyright 2012 Johannes M. Schmitt <[email protected]> |
||
5 | * |
||
6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
||
7 | * you may not use this file except in compliance with the License. |
||
8 | * You may obtain a copy of the License at |
||
9 | * |
||
10 | * http://www.apache.org/licenses/LICENSE-2.0 |
||
11 | * |
||
12 | * Unless required by applicable law or agreed to in writing, software |
||
13 | * distributed under the License is distributed on an "AS IS" BASIS, |
||
14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
15 | * See the License for the specific language governing permissions and |
||
16 | * limitations under the License. |
||
17 | */ |
||
18 | |||
19 | namespace JMS\JobQueueBundle\Entity\Repository; |
||
20 | |||
21 | use Doctrine\Common\Collections\ArrayCollection; |
||
22 | use Doctrine\Common\Util\ClassUtils; |
||
23 | use Doctrine\DBAL\Connection; |
||
24 | use Doctrine\ORM\EntityManager; |
||
25 | use Doctrine\ORM\Query\Parameter; |
||
26 | use Doctrine\ORM\Query\ResultSetMappingBuilder; |
||
27 | use Doctrine\Persistence\ManagerRegistry as PersistenceManagerRegistry; |
||
28 | use JMS\JobQueueBundle\Entity\Job; |
||
29 | use JMS\JobQueueBundle\Event\StateChangeEvent; |
||
30 | use JMS\JobQueueBundle\Retry\ExponentialRetryScheduler; |
||
31 | use JMS\JobQueueBundle\Retry\RetryScheduler; |
||
32 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
33 | |||
34 | class JobManager |
||
35 | { |
||
36 | private $dispatcher; |
||
37 | private $registry; |
||
38 | private $retryScheduler; |
||
39 | |||
40 | 46 | public function __construct(PersistenceManagerRegistry $managerRegistry, EventDispatcherInterface $eventDispatcher, RetryScheduler $retryScheduler = null) |
|
41 | { |
||
42 | 46 | $this->registry = $managerRegistry; |
|
43 | 46 | $this->dispatcher = $eventDispatcher; |
|
44 | 46 | $this->retryScheduler = $retryScheduler; |
|
45 | 46 | } |
|
46 | |||
47 | 30 | public function findStartableJob($workerName, array &$excludedIds = array(), $excludedQueues = array(), $restrictedQueues = array()) |
|
48 | { |
||
49 | 30 | while (null !== $job = $this->findPendingJob($excludedIds, $excludedQueues, $restrictedQueues)) { |
|
50 | 28 | if ($job->isStartable() && $this->acquireLock($workerName, $job)) { |
|
51 | 28 | return $job; |
|
52 | } |
||
53 | |||
54 | 4 | $excludedIds[] = $job->getId(); |
|
55 | |||
56 | // We do not want to have non-startable jobs floating around in |
||
57 | // cache as they might be changed by another process. So, better |
||
58 | // re-fetch them when they are not excluded anymore. |
||
59 | 4 | $this->getJobManager()->getUnitOfWork()->detach($job); // ! detach is deprecated and has no replacement |
|
60 | } |
||
61 | |||
62 | 28 | return null; |
|
63 | } |
||
64 | |||
65 | 28 | private function acquireLock($workerName, Job $job) |
|
66 | { |
||
67 | 28 | $affectedRows = $this->getJobManager()->getConnection()->executeStatement( |
|
68 | 28 | "UPDATE jms_jobs SET workerName = :worker WHERE id = :id AND workerName IS NULL", |
|
69 | array( |
||
70 | 28 | 'worker' => $workerName, |
|
71 | 28 | 'id' => $job->getId(), |
|
72 | ) |
||
73 | ); |
||
74 | |||
75 | 28 | if ($affectedRows > 0) { |
|
76 | 28 | $job->setWorkerName($workerName); |
|
77 | |||
78 | 28 | return true; |
|
79 | } |
||
80 | |||
81 | return false; |
||
82 | } |
||
83 | |||
84 | public function findAllForRelatedEntity($relatedEntity) |
||
85 | { |
||
86 | list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity); |
||
87 | |||
88 | $rsm = new ResultSetMappingBuilder($this->getJobManager()); |
||
89 | $rsm->addRootEntityFromClassMetadata('JMSJobQueueBundle:Job', 'j'); |
||
90 | |||
91 | return $this->getJobManager()->createNativeQuery("SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId", $rsm) |
||
92 | ->setParameter('relClass', $relClass) |
||
93 | ->setParameter('relId', $relId) |
||
94 | ->getResult(); |
||
95 | } |
||
96 | |||
97 | public function findOpenJobForRelatedEntity($command, $relatedEntity) |
||
98 | { |
||
99 | return $this->findJobForRelatedEntity($command, $relatedEntity, array(Job::STATE_RUNNING, Job::STATE_PENDING, Job::STATE_NEW)); |
||
100 | } |
||
101 | |||
102 | 2 | public function findJobForRelatedEntity($command, $relatedEntity, array $states = array()) |
|
103 | { |
||
104 | 2 | list($relClass, $relId) = $this->getRelatedEntityIdentifier($relatedEntity); |
|
105 | |||
106 | 2 | $rsm = new ResultSetMappingBuilder($this->getJobManager()); |
|
107 | 2 | $rsm->addRootEntityFromClassMetadata('JMSJobQueueBundle:Job', 'j'); |
|
108 | |||
109 | 2 | $sql = "SELECT j.* FROM jms_jobs j INNER JOIN jms_job_related_entities r ON r.job_id = j.id WHERE r.related_class = :relClass AND r.related_id = :relId AND j.command = :command"; |
|
110 | 2 | $params = new ArrayCollection(); |
|
111 | 2 | $params->add(new Parameter('command', $command)); |
|
112 | 2 | $params->add(new Parameter('relClass', $relClass)); |
|
113 | 2 | $params->add(new Parameter('relId', $relId)); |
|
114 | |||
115 | 2 | if (!empty($states)) { |
|
116 | $sql .= " AND j.state IN (:states)"; |
||
117 | $params->add(new Parameter('states', $states, Connection::PARAM_STR_ARRAY)); |
||
118 | } |
||
119 | |||
120 | 2 | return $this->getJobManager()->createNativeQuery($sql, $rsm) |
|
121 | 2 | ->setParameters($params) |
|
122 | 2 | ->getOneOrNullResult(); |
|
123 | } |
||
124 | |||
125 | 2 | private function getRelatedEntityIdentifier($entity) |
|
126 | { |
||
127 | 2 | if (!is_object($entity)) { |
|
128 | throw new \RuntimeException('$entity must be an object.'); |
||
129 | } |
||
130 | |||
131 | 2 | if ($entity instanceof \Doctrine\Persistence\Proxy) { |
|
132 | $entity->__load(); |
||
133 | } |
||
134 | |||
135 | 2 | $relClass = ClassUtils::getClass($entity); |
|
136 | 2 | $relId = $this->registry->getManagerForClass($relClass)->getMetadataFactory() |
|
137 | 2 | ->getMetadataFor($relClass)->getIdentifierValues($entity); |
|
138 | 2 | asort($relId); |
|
139 | |||
140 | 2 | if (!$relId) { |
|
0 ignored issues
–
show
|
|||
141 | throw new \InvalidArgumentException(sprintf('The identifier for entity of class "%s" was empty.', $relClass)); |
||
142 | } |
||
143 | |||
144 | 2 | return array($relClass, json_encode($relId)); |
|
145 | } |
||
146 | |||
147 | 36 | public function findPendingJob(array $excludedIds = array(), array $excludedQueues = array(), array $restrictedQueues = array()) |
|
148 | { |
||
149 | 36 | $qb = $this->getJobManager()->createQueryBuilder(); |
|
150 | 36 | $qb->select('j')->from('JMSJobQueueBundle:Job', 'j') |
|
151 | 36 | ->orderBy('j.priority', 'ASC') |
|
152 | 36 | ->addOrderBy('j.id', 'ASC'); |
|
153 | |||
154 | 36 | $conditions = array(); |
|
155 | |||
156 | 36 | $conditions[] = $qb->expr()->isNull('j.workerName'); |
|
157 | |||
158 | 36 | $conditions[] = $qb->expr()->lt('j.executeAfter', ':now'); |
|
159 | 36 | $qb->setParameter(':now', new \DateTime(), 'datetime'); |
|
160 | |||
161 | 36 | $conditions[] = $qb->expr()->eq('j.state', ':state'); |
|
162 | 36 | $qb->setParameter('state', Job::STATE_PENDING); |
|
163 | |||
164 | 36 | if (!empty($excludedIds)) { |
|
165 | 8 | $conditions[] = $qb->expr()->notIn('j.id', ':excludedIds'); |
|
166 | 8 | $qb->setParameter('excludedIds', $excludedIds, Connection::PARAM_INT_ARRAY); |
|
167 | } |
||
168 | |||
169 | 36 | if (!empty($excludedQueues)) { |
|
170 | 24 | $conditions[] = $qb->expr()->notIn('j.queue', ':excludedQueues'); |
|
171 | 24 | $qb->setParameter('excludedQueues', $excludedQueues, Connection::PARAM_STR_ARRAY); |
|
172 | } |
||
173 | |||
174 | 36 | if (!empty($restrictedQueues)) { |
|
175 | 6 | $conditions[] = $qb->expr()->in('j.queue', ':restrictedQueues'); |
|
176 | 6 | $qb->setParameter('restrictedQueues', $restrictedQueues, Connection::PARAM_STR_ARRAY); |
|
177 | } |
||
178 | |||
179 | 36 | $qb->where(call_user_func_array(array($qb->expr(), 'andX'), $conditions)); |
|
180 | |||
181 | 36 | return $qb->getQuery()->setMaxResults(1)->getOneOrNullResult(); |
|
182 | } |
||
183 | |||
184 | 30 | public function closeJob(Job $job, $finalState) |
|
185 | { |
||
186 | 30 | $this->getJobManager()->getConnection()->beginTransaction(); |
|
187 | try { |
||
188 | 30 | $visited = array(); |
|
189 | 30 | $this->closeJobInternal($job, $finalState, $visited); |
|
190 | 30 | $this->getJobManager()->flush(); |
|
191 | 30 | $this->getJobManager()->getConnection()->commit(); |
|
192 | |||
193 | // Clean-up entity manager to allow for garbage collection to kick in. |
||
194 | 30 | foreach ($visited as $job) { |
|
195 | // If the job is an original job which is now being retried, let's |
||
196 | // not remove it just yet. |
||
197 | 30 | if (!$job->isClosedNonSuccessful() || $job->isRetryJob()) { |
|
198 | 20 | continue; |
|
199 | } |
||
200 | |||
201 | 10 | $this->getJobManager()->getUnitOfWork()->detach($job); // ! detach is deprecated and has no replacement |
|
202 | } |
||
203 | } catch (\Throwable $ex) { |
||
204 | $this->getJobManager()->getConnection()->rollback(); |
||
205 | |||
206 | throw $ex; |
||
207 | } |
||
208 | 30 | } |
|
209 | |||
210 | 30 | private function closeJobInternal(Job $job, $finalState, array &$visited = array()) |
|
211 | { |
||
212 | 30 | if (in_array($job, $visited, true)) { |
|
213 | return; |
||
214 | } |
||
215 | 30 | $visited[] = $job; |
|
216 | |||
217 | 30 | if ($job->isInFinalState()) { |
|
218 | return; |
||
219 | } |
||
220 | |||
221 | 30 | if (null !== $this->dispatcher && ($job->isRetryJob() || 0 === count($job->getRetryJobs()))) { |
|
222 | 30 | $event = new StateChangeEvent($job, $finalState); |
|
223 | 30 | $this->dispatcher->dispatch($event); |
|
0 ignored issues
–
show
$event is of type object<JMS\JobQueueBundle\Event\StateChangeEvent> , but the function expects a object<Symfony\Contracts\EventDispatcher\object> .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
![]() |
|||
224 | 30 | $finalState = $event->getNewState(); |
|
225 | } |
||
226 | |||
227 | 30 | switch ($finalState) { |
|
228 | case Job::STATE_CANCELED: |
||
229 | 6 | $job->setState(Job::STATE_CANCELED); |
|
230 | 6 | $this->getJobManager()->persist($job); |
|
231 | |||
232 | 6 | if ($job->isRetryJob()) { |
|
233 | $this->closeJobInternal($job->getOriginalJob(), Job::STATE_CANCELED, $visited); |
||
234 | |||
235 | return; |
||
236 | } |
||
237 | |||
238 | 6 | foreach ($this->findIncomingDependencies($job) as $dep) { |
|
239 | 2 | $this->closeJobInternal($dep, Job::STATE_CANCELED, $visited); |
|
240 | } |
||
241 | |||
242 | 6 | return; |
|
243 | |||
244 | case Job::STATE_FAILED: |
||
245 | case Job::STATE_TERMINATED: |
||
246 | case Job::STATE_INCOMPLETE: |
||
247 | 12 | if ($job->isRetryJob()) { |
|
248 | 2 | $job->setState($finalState); |
|
249 | 2 | $this->getJobManager()->persist($job); |
|
250 | |||
251 | 2 | $this->closeJobInternal($job->getOriginalJob(), $finalState); |
|
252 | |||
253 | 2 | return; |
|
254 | } |
||
255 | |||
256 | // The original job has failed, and we are allowed to retry it. |
||
257 | 12 | if ($job->isRetryAllowed()) { |
|
258 | 4 | $retryJob = new Job($job->getCommand(), $job->getArgs(), true, $job->getQueue(), $job->getPriority()); |
|
259 | 4 | $retryJob->setMaxRuntime($job->getMaxRuntime()); |
|
260 | |||
261 | 4 | if ($this->retryScheduler === null) { |
|
262 | $this->retryScheduler = new ExponentialRetryScheduler(5); |
||
263 | } |
||
264 | |||
265 | 4 | $retryJob->setExecuteAfter($this->retryScheduler->scheduleNextRetry($job)); |
|
266 | |||
267 | 4 | $job->addRetryJob($retryJob); |
|
268 | 4 | $this->getJobManager()->persist($retryJob); |
|
269 | 4 | $this->getJobManager()->persist($job); |
|
270 | |||
271 | 4 | return; |
|
272 | } |
||
273 | |||
274 | 10 | $job->setState($finalState); |
|
275 | 10 | $this->getJobManager()->persist($job); |
|
276 | |||
277 | // The original job has failed, and no retries are allowed. |
||
278 | 10 | foreach ($this->findIncomingDependencies($job) as $dep) { |
|
279 | // This is a safe-guard to avoid blowing up if there is a database inconsistency. |
||
280 | 4 | if (!$dep->isPending() && !$dep->isNew()) { |
|
281 | continue; |
||
282 | } |
||
283 | |||
284 | 4 | $this->closeJobInternal($dep, Job::STATE_CANCELED, $visited); |
|
285 | } |
||
286 | |||
287 | 10 | return; |
|
288 | |||
289 | case Job::STATE_FINISHED: |
||
290 | 18 | if ($job->isRetryJob()) { |
|
291 | 2 | $job->getOriginalJob()->setState($finalState); |
|
292 | 2 | $this->getJobManager()->persist($job->getOriginalJob()); |
|
293 | } |
||
294 | 18 | $job->setState($finalState); |
|
295 | 18 | $this->getJobManager()->persist($job); |
|
296 | |||
297 | 18 | return; |
|
298 | |||
299 | default: |
||
300 | throw new \LogicException(sprintf('Non allowed state "%s" in closeJobInternal().', $finalState)); |
||
301 | } |
||
302 | } |
||
303 | |||
304 | /** |
||
305 | * @return Job[] |
||
306 | */ |
||
307 | 12 | View Code Duplication | public function findIncomingDependencies(Job $job) |
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
308 | { |
||
309 | 12 | $jobIds = $this->getJobIdsOfIncomingDependencies($job); |
|
310 | 12 | if (empty($jobIds)) { |
|
311 | 12 | return array(); |
|
312 | } |
||
313 | |||
314 | 6 | return $this->getJobManager()->createQuery("SELECT j, d FROM JMSJobQueueBundle:Job j LEFT JOIN j.dependencies d WHERE j.id IN (:ids)") |
|
315 | 6 | ->setParameter('ids', $jobIds) |
|
316 | 6 | ->getResult(); |
|
317 | } |
||
318 | |||
319 | /** |
||
320 | * @return Job[] |
||
321 | */ |
||
322 | View Code Duplication | public function getIncomingDependencies(Job $job) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository. ![]() |
|||
323 | { |
||
324 | $jobIds = $this->getJobIdsOfIncomingDependencies($job); |
||
325 | if (empty($jobIds)) { |
||
326 | return array(); |
||
327 | } |
||
328 | |||
329 | return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.id IN (:ids)") |
||
330 | ->setParameter('ids', $jobIds) |
||
331 | ->getResult(); |
||
332 | } |
||
333 | |||
334 | 12 | private function getJobIdsOfIncomingDependencies(Job $job) |
|
335 | { |
||
336 | 12 | $jobIds = $this->getJobManager()->getConnection() |
|
337 | 12 | ->executeQuery("SELECT source_job_id FROM jms_job_dependencies WHERE dest_job_id = :id", array('id' => $job->getId())) |
|
338 | 12 | ->fetchAll(\PDO::FETCH_COLUMN); |
|
339 | |||
340 | 12 | return $jobIds; |
|
341 | } |
||
342 | |||
343 | public function findLastJobsWithError($nbJobs = 10) |
||
344 | { |
||
345 | return $this->getJobManager()->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state IN (:errorStates) AND j.originalJob IS NULL ORDER BY j.closedAt DESC") |
||
346 | ->setParameter('errorStates', array(Job::STATE_TERMINATED, Job::STATE_FAILED)) |
||
347 | ->setMaxResults($nbJobs) |
||
348 | ->getResult(); |
||
349 | } |
||
350 | |||
351 | public function getAvailableQueueList() |
||
352 | { |
||
353 | $queues = $this->getJobManager()->createQuery("SELECT DISTINCT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) GROUP BY j.queue") |
||
354 | ->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING)) |
||
355 | ->getResult(); |
||
356 | |||
357 | $newQueueArray = array(); |
||
358 | |||
359 | foreach ($queues as $queue) { |
||
360 | $newQueue = $queue['queue']; |
||
361 | $newQueueArray[] = $newQueue; |
||
362 | } |
||
363 | |||
364 | return $newQueueArray; |
||
365 | } |
||
366 | |||
367 | |||
368 | public function getAvailableJobsForQueueCount($jobQueue) |
||
369 | { |
||
370 | $result = $this->getJobManager()->createQuery("SELECT j.queue FROM JMSJobQueueBundle:Job j WHERE j.state IN (:availableStates) AND j.queue = :queue") |
||
371 | ->setParameter('availableStates', array(Job::STATE_RUNNING, Job::STATE_NEW, Job::STATE_PENDING)) |
||
372 | ->setParameter('queue', $jobQueue) |
||
373 | ->setMaxResults(1) |
||
374 | ->getOneOrNullResult(); |
||
375 | |||
376 | return count($result); |
||
377 | } |
||
378 | |||
379 | 44 | private function getJobManager(): EntityManager |
|
380 | { |
||
381 | 44 | return $this->registry->getManagerForClass(Job::class); |
|
382 | } |
||
383 | } |
||
384 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.