This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace SfCod\QueueBundle\Worker; |
||
4 | |||
5 | use Exception; |
||
6 | use SfCod\QueueBundle\Event\JobExceptionOccurredEvent; |
||
7 | use SfCod\QueueBundle\Event\JobFailedEvent; |
||
8 | use SfCod\QueueBundle\Event\JobProcessedEvent; |
||
9 | use SfCod\QueueBundle\Event\JobProcessingEvent; |
||
10 | use SfCod\QueueBundle\Event\WorkerStoppingEvent; |
||
11 | use SfCod\QueueBundle\Exception\FatalThrowableException; |
||
12 | use SfCod\QueueBundle\Exception\MaxAttemptsExceededException; |
||
13 | use SfCod\QueueBundle\Failer\FailedJobProviderInterface; |
||
14 | use SfCod\QueueBundle\Handler\ExceptionHandlerInterface; |
||
15 | use SfCod\QueueBundle\Job\JobContractInterface; |
||
16 | use SfCod\QueueBundle\Queue\QueueInterface; |
||
17 | use SfCod\QueueBundle\Service\JobProcess; |
||
18 | use SfCod\QueueBundle\Service\QueueManager; |
||
19 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
20 | use Symfony\Component\Process\Process; |
||
21 | use Throwable; |
||
22 | |||
23 | /** |
||
24 | * Thread worker for job queues |
||
25 | * |
||
26 | * @author Virchenko Maksim <[email protected]> |
||
27 | */ |
||
28 | class Worker |
||
29 | { |
||
30 | /** |
||
31 | * Events |
||
32 | */ |
||
33 | const EVENT_RAISE_BEFORE_JOB = 'job_queue_worker.raise_before_job'; |
||
34 | const EVENT_RAISE_AFTER_JOB = 'job_queue_worker.raise_after_job'; |
||
35 | const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'job_queue_worker.raise_exception_occurred_job'; |
||
36 | const EVENT_RAISE_FAILED_JOB = 'job_queue_worker.raise_failed_job'; |
||
37 | const EVENT_STOP = 'job_queue_worker.stop'; |
||
38 | |||
39 | /** |
||
40 | * QueueManager instance |
||
41 | * |
||
42 | * @var QueueManager |
||
43 | */ |
||
44 | private $queueManager; |
||
45 | |||
46 | /** |
||
47 | * Logger instance |
||
48 | * |
||
49 | * @var ExceptionHandlerInterface |
||
50 | */ |
||
51 | private $exceptions; |
||
52 | |||
53 | /** |
||
54 | * Failer instance |
||
55 | * |
||
56 | * @var FailedJobProviderInterface |
||
57 | */ |
||
58 | private $failer; |
||
59 | |||
60 | /** |
||
61 | * @var EventDispatcherInterface |
||
62 | */ |
||
63 | private $dispatcher; |
||
64 | |||
65 | /** |
||
66 | * @var JobProcess |
||
67 | */ |
||
68 | private $process; |
||
69 | |||
70 | /** |
||
71 | * Worker constructor. |
||
72 | * |
||
73 | * @param QueueManager $queueManager |
||
74 | * @param JobProcess $process |
||
75 | * @param FailedJobProviderInterface $failer |
||
76 | * @param ExceptionHandlerInterface $exceptions |
||
77 | * @param EventDispatcherInterface $dispatcher |
||
78 | */ |
||
79 | public function __construct(QueueManager $queueManager, |
||
80 | JobProcess $process, |
||
81 | FailedJobProviderInterface $failer, |
||
82 | ExceptionHandlerInterface $exceptions, |
||
83 | EventDispatcherInterface $dispatcher) |
||
84 | { |
||
85 | $this->queueManager = $queueManager; |
||
86 | $this->process = $process; |
||
87 | $this->failer = $failer; |
||
88 | $this->exceptions = $exceptions; |
||
89 | $this->dispatcher = $dispatcher; |
||
90 | } |
||
91 | |||
92 | /** |
||
93 | * Listen to the given queue in a loop. |
||
94 | * |
||
95 | * @param string $connectionName |
||
96 | * @param string $queue |
||
97 | * @param Options $options |
||
98 | */ |
||
99 | public function daemon(string $connectionName, string $queue, Options $options) |
||
100 | { |
||
101 | while (true) { |
||
102 | if (false === $this->runNextJob($connectionName, $queue, $options)) { |
||
103 | $this->sleep($options->sleep); |
||
104 | } |
||
105 | |||
106 | if ($this->memoryExceeded($options->memory)) { |
||
107 | $this->stop(); |
||
108 | } |
||
109 | } |
||
110 | } |
||
111 | |||
112 | /** |
||
113 | * Process the next job on the queue. |
||
114 | * |
||
115 | * @param string $connectionName |
||
116 | * @param string $queue |
||
117 | * @param Options $options |
||
118 | * |
||
119 | * @return bool |
||
120 | */ |
||
121 | public function runNextJob(string $connectionName, string $queue, Options $options) |
||
122 | { |
||
123 | $connection = $this->queueManager->connection($connectionName); |
||
124 | $job = $this->getNextJob($connection, $queue); |
||
125 | |||
126 | // If we're able to pull a job off of the stack, we will process it and then return |
||
127 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
128 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
129 | if ($job instanceof JobContractInterface && $connection->canRunJob($job)) { |
||
130 | $connection->markJobAsReserved($job); |
||
131 | $this->runInBackground($job, $options); |
||
132 | |||
133 | return true; |
||
134 | } |
||
135 | |||
136 | return false; |
||
137 | } |
||
138 | |||
139 | /** |
||
140 | * Process the next job on the queue. |
||
141 | * |
||
142 | * @param string $connectionName |
||
143 | * @param string $queue |
||
144 | * @param string $id |
||
145 | * @param Options $options |
||
146 | */ |
||
147 | public function runJobById(string $connectionName, string $queue, string $id, Options $options) |
||
148 | { |
||
149 | try { |
||
150 | $connection = $this->queueManager->connection($connectionName); |
||
151 | $job = $connection->getJobById($queue, $id); |
||
152 | |||
153 | // If we're able to pull a job off of the stack, we will process it and then return |
||
154 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
155 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
156 | if ($job instanceof JobContractInterface) { |
||
157 | if (false === $job->reserved()) { |
||
158 | $connection->markJobAsReserved($job); |
||
159 | } |
||
160 | |||
161 | $this->process($connectionName, $job, $options); |
||
162 | |||
163 | return; |
||
164 | } |
||
165 | } catch (Exception $e) { |
||
166 | $this->exceptions->report($e); |
||
167 | } catch (Throwable $e) { |
||
168 | $this->exceptions->report(new FatalThrowableException($e)); |
||
0 ignored issues
–
show
|
|||
169 | } |
||
170 | |||
171 | $this->sleep($options->sleep); |
||
172 | } |
||
173 | |||
174 | /** |
||
175 | * Make a Process for the Artisan command for the job id. |
||
176 | * |
||
177 | * @param JobContractInterface $job |
||
178 | * @param Options $options |
||
179 | */ |
||
180 | public function runInBackground(JobContractInterface $job, Options $options) |
||
181 | { |
||
182 | $process = $this->process->getProcess($job, $options); |
||
183 | |||
184 | $process->run(); |
||
185 | } |
||
186 | |||
187 | /** Process the given job from the queue. |
||
188 | * |
||
189 | * @param string $connectionName |
||
190 | * @param JobContractInterface $job |
||
191 | * @param Options $options |
||
192 | * |
||
193 | * @return void |
||
194 | * |
||
195 | * @throws \Throwable |
||
196 | */ |
||
197 | public function process(string $connectionName, JobContractInterface $job, Options $options) |
||
198 | { |
||
199 | try { |
||
200 | // First we will raise the before job event and determine if the job has already ran |
||
201 | // over the its maximum attempt limit, which could primarily happen if the job is |
||
202 | // continually timing out and not actually throwing any exceptions from itself. |
||
203 | $this->raiseBeforeJobEvent($connectionName, $job); |
||
204 | |||
205 | $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( |
||
206 | $connectionName, $job, (int)$options->maxTries |
||
207 | ); |
||
208 | |||
209 | // Here we will fire off the job and let it process. We will catch any exceptions so |
||
210 | // they can be reported to the developers logs, etc. Once the job is finished the |
||
211 | // proper events will be fired to let any listeners know this job has finished. |
||
212 | $job->fire(); |
||
213 | |||
214 | $this->raiseAfterJobEvent($connectionName, $job); |
||
215 | } catch (Exception $e) { |
||
216 | $this->handleJobException($connectionName, $job, $options, $e); |
||
217 | } catch (Throwable $e) { |
||
218 | $this->handleJobException( |
||
219 | $connectionName, $job, $options, new FatalThrowableException($e) |
||
0 ignored issues
–
show
$e of type object<Throwable> is not a sub-type of object<Exception> . It seems like you assume a concrete implementation of the interface Throwable to be always present.
This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass. Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type. ![]() |
|||
220 | ); |
||
221 | } |
||
222 | } |
||
223 | |||
224 | /** |
||
225 | * Sleep the script for a given number of seconds. |
||
226 | * |
||
227 | * @param int $seconds |
||
228 | * |
||
229 | * @return void |
||
230 | */ |
||
231 | public function sleep(int $seconds) |
||
232 | { |
||
233 | sleep($seconds); |
||
234 | } |
||
235 | |||
236 | /** |
||
237 | * Determine if the memory limit has been exceeded. |
||
238 | * |
||
239 | * @param int $memoryLimit |
||
240 | * |
||
241 | * @return bool |
||
242 | */ |
||
243 | public function memoryExceeded(int $memoryLimit) |
||
244 | { |
||
245 | return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; |
||
246 | } |
||
247 | |||
248 | /** |
||
249 | * Stop listening and bail out of the script. |
||
250 | */ |
||
251 | public function stop() |
||
252 | { |
||
253 | $this->dispatcher->dispatch(new WorkerStoppingEvent(), self::EVENT_STOP); |
||
0 ignored issues
–
show
new \SfCod\QueueBundle\Event\WorkerStoppingEvent() is of type object<SfCod\QueueBundle...nt\WorkerStoppingEvent> , but the function expects a object<Symfony\Contracts\EventDispatcher\object> .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
![]() |
|||
254 | |||
255 | exit(0); |
||
256 | } |
||
257 | |||
258 | /** |
||
259 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
260 | * |
||
261 | * This will likely be because the job previously exceeded a timeout. |
||
262 | * |
||
263 | * @param string $connectionName |
||
264 | * @param JobContractInterface $job |
||
265 | * @param int $maxTries |
||
266 | * |
||
267 | * @return void |
||
268 | */ |
||
269 | protected function markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries) |
||
270 | { |
||
271 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
272 | |||
273 | $timeoutAt = $job->timeoutAt(); |
||
274 | |||
275 | if ($timeoutAt && time() <= $timeoutAt) { |
||
0 ignored issues
–
show
The expression
$timeoutAt of type integer|null is loosely compared to true ; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.
In PHP, under loose comparison (like For 0 == false // true
0 == null // true
123 == false // false
123 == null // false
// It is often better to use strict comparison
0 === false // false
0 === null // false
![]() |
|||
276 | return; |
||
277 | } |
||
278 | |||
279 | if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) { |
||
0 ignored issues
–
show
The expression
$timeoutAt of type integer|null is loosely compared to false ; this is ambiguous if the integer can be zero. You might want to explicitly use === null instead.
In PHP, under loose comparison (like For 0 == false // true
0 == null // true
123 == false // false
123 == null // false
// It is often better to use strict comparison
0 === false // false
0 === null // false
![]() |
|||
280 | return; |
||
281 | } |
||
282 | |||
283 | $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( |
||
284 | 'A queued job has been attempted too many times or run too long. The job may have previously timed out.' |
||
285 | )); |
||
286 | |||
287 | throw $e; |
||
288 | } |
||
289 | |||
290 | /** |
||
291 | * Mark the given job as failed and raise the relevant event. |
||
292 | * |
||
293 | * @param string $connectionName |
||
294 | * @param JobContractInterface $job |
||
295 | * @param Exception $e |
||
296 | */ |
||
297 | protected function failJob(string $connectionName, JobContractInterface $job, Exception $e) |
||
298 | { |
||
299 | if ($job->isDeleted()) { |
||
300 | return; |
||
301 | } |
||
302 | |||
303 | try { |
||
304 | // If the job has failed, we will delete it, call the "failed" method and then call |
||
305 | // an event indicating the job has failed so it can be logged if needed. This is |
||
306 | // to allow every developer to better keep monitor of their failed queue jobs. |
||
307 | $job->delete(); |
||
308 | |||
309 | $job->failed($e); |
||
310 | } finally { |
||
311 | $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e); |
||
312 | $this->raiseFailedJobEvent($connectionName, $job, $e); |
||
313 | } |
||
314 | } |
||
315 | |||
316 | /** |
||
317 | * Handle an exception that occurred while the job was running. |
||
318 | * |
||
319 | * @param string $connectionName |
||
320 | * @param JobContractInterface $job |
||
321 | * @param Options $options |
||
322 | * @param Exception $e |
||
323 | * |
||
324 | * @return void |
||
325 | * |
||
326 | * @throws Exception |
||
327 | */ |
||
328 | protected function handleJobException(string $connectionName, JobContractInterface $job, Options $options, Exception $e) |
||
329 | { |
||
330 | try { |
||
331 | // First, we will go ahead and mark the job as failed if it will exceed the maximum |
||
332 | // attempts it is allowed to run the next time we process it. If so we will just |
||
333 | // go ahead and mark it as failed now so we do not have to release this again. |
||
334 | if (!$job->hasFailed()) { |
||
0 ignored issues
–
show
The method
hasFailed() does not exist on SfCod\QueueBundle\Job\JobContractInterface . Did you maybe mean failed() ?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. ![]() |
|||
335 | $this->markJobAsFailedIfWillExceedMaxAttempts( |
||
336 | $connectionName, $job, (int)$options->maxTries, $e |
||
337 | ); |
||
338 | } |
||
339 | |||
340 | $this->raiseExceptionOccurredJobEvent( |
||
341 | $connectionName, $job, $e |
||
342 | ); |
||
343 | } finally { |
||
344 | // If we catch an exception, we will attempt to release the job back onto the queue |
||
345 | // so it is not lost entirely. This'll let the job be retried at a later time by |
||
346 | // another listener (or this same one). We will re-throw this exception after. |
||
347 | if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) { |
||
0 ignored issues
–
show
The method
isReleased() does not exist on SfCod\QueueBundle\Job\JobContractInterface . Did you maybe mean release() ?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. ![]() The method
hasFailed() does not exist on SfCod\QueueBundle\Job\JobContractInterface . Did you maybe mean failed() ?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. ![]() |
|||
348 | $job->release($options->delay); |
||
349 | } |
||
350 | } |
||
351 | |||
352 | throw $e; |
||
353 | } |
||
354 | |||
355 | /** |
||
356 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
357 | * |
||
358 | * @param string $connectionName |
||
359 | * @param JobContractInterface $job |
||
360 | * @param int $maxTries |
||
361 | * @param Exception $e |
||
362 | * |
||
363 | * @return void |
||
364 | */ |
||
365 | protected function markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, JobContractInterface $job, int $maxTries, Exception $e) |
||
366 | { |
||
367 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
368 | |||
369 | if ($job->timeoutAt() && $job->timeoutAt() <= time()) { |
||
0 ignored issues
–
show
The expression
$job->timeoutAt() of type integer|null is loosely compared to true ; this is ambiguous if the integer can be zero. You might want to explicitly use !== null instead.
In PHP, under loose comparison (like For 0 == false // true
0 == null // true
123 == false // false
123 == null // false
// It is often better to use strict comparison
0 === false // false
0 === null // false
![]() |
|||
370 | $this->failJob($connectionName, $job, $e); |
||
371 | } |
||
372 | |||
373 | if ($maxTries > 0 && $job->attempts() >= $maxTries) { |
||
374 | $this->failJob($connectionName, $job, $e); |
||
375 | } |
||
376 | } |
||
377 | |||
378 | /** |
||
379 | * Get the next job from the queue connection. |
||
380 | * |
||
381 | * @param QueueInterface $connection |
||
382 | * @param string $queue |
||
383 | * |
||
384 | * @return JobContractInterface|null |
||
385 | */ |
||
386 | protected function getNextJob(QueueInterface $connection, string $queue): ?JobContractInterface |
||
387 | { |
||
388 | try { |
||
389 | foreach (explode(',', $queue) as $queue) { |
||
390 | if (!is_null($job = $connection->pop($queue))) { |
||
391 | return $job; |
||
392 | } |
||
393 | } |
||
394 | } catch (Exception $e) { |
||
395 | $this->exceptions->report($e); |
||
396 | } catch (Throwable $e) { |
||
397 | $this->exceptions->report($e = new FatalThrowableException($e)); |
||
0 ignored issues
–
show
$e of type object<Throwable> is not a sub-type of object<Exception> . It seems like you assume a concrete implementation of the interface Throwable to be always present.
This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass. Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type. ![]() |
|||
398 | } |
||
399 | |||
400 | return null; |
||
401 | } |
||
402 | |||
403 | /** |
||
404 | * Raise the before queue job event. |
||
405 | * |
||
406 | * @param string $connectionName |
||
407 | * @param JobContractInterface $job |
||
408 | */ |
||
409 | protected function raiseBeforeJobEvent(string $connectionName, JobContractInterface $job) |
||
410 | { |
||
411 | $this->dispatcher->dispatch(new JobProcessingEvent($connectionName, $job), self::EVENT_RAISE_AFTER_JOB); |
||
0 ignored issues
–
show
new \SfCod\QueueBundle\E...($connectionName, $job) is of type object<SfCod\QueueBundle...ent\JobProcessingEvent> , but the function expects a object<Symfony\Contracts\EventDispatcher\object> .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
![]() |
|||
412 | } |
||
413 | |||
414 | /** |
||
415 | * Raise the after queue job event. |
||
416 | * |
||
417 | * @param string $connectionName |
||
418 | * @param JobContractInterface $job |
||
419 | */ |
||
420 | protected function raiseAfterJobEvent(string $connectionName, JobContractInterface $job) |
||
421 | { |
||
422 | $this->dispatcher->dispatch(new JobProcessedEvent($connectionName, $job), self::EVENT_RAISE_AFTER_JOB); |
||
0 ignored issues
–
show
new \SfCod\QueueBundle\E...($connectionName, $job) is of type object<SfCod\QueueBundle\Event\JobProcessedEvent> , but the function expects a object<Symfony\Contracts\EventDispatcher\object> .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
![]() |
|||
423 | } |
||
424 | |||
425 | /** |
||
426 | * Raise the exception occurred queue job event. |
||
427 | * |
||
428 | * @param string $connectionName |
||
429 | * @param JobContractInterface $job |
||
430 | * @param Exception $e |
||
431 | */ |
||
432 | protected function raiseExceptionOccurredJobEvent(string $connectionName, JobContractInterface $job, Exception $e) |
||
433 | { |
||
434 | $this->dispatcher->dispatch(new JobExceptionOccurredEvent($connectionName, $job, $e), self::EVENT_RAISE_EXCEPTION_OCCURED_JOB); |
||
0 ignored issues
–
show
new \SfCod\QueueBundle\E...nnectionName, $job, $e) is of type object<SfCod\QueueBundle...ExceptionOccurredEvent> , but the function expects a object<Symfony\Contracts\EventDispatcher\object> .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
![]() |
|||
435 | } |
||
436 | |||
437 | /** |
||
438 | * Raise the failed queue job event. |
||
439 | * |
||
440 | * @param string $connectionName |
||
441 | * @param JobContractInterface $job |
||
442 | * @param Exception $e |
||
443 | */ |
||
444 | protected function raiseFailedJobEvent(string $connectionName, JobContractInterface $job, Exception $e) |
||
445 | { |
||
446 | $this->dispatcher->dispatch(new JobFailedEvent($connectionName, $job, $e), self::EVENT_RAISE_FAILED_JOB); |
||
0 ignored issues
–
show
new \SfCod\QueueBundle\E...nnectionName, $job, $e) is of type object<SfCod\QueueBundle\Event\JobFailedEvent> , but the function expects a object<Symfony\Contracts\EventDispatcher\object> .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
![]() |
|||
447 | } |
||
448 | } |
||
449 |
This check looks for parameters that are defined as one type in their type hint or doc comment but seem to be used as a narrower type, i.e an implementation of an interface or a subclass.
Consider changing the type of the parameter or doing an instanceof check before assuming your parameter is of the expected type.