This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace yiicod\jobqueue; |
||
4 | |||
5 | use Exception; |
||
6 | use Illuminate\Contracts\Debug\ExceptionHandler; |
||
7 | use Illuminate\Queue\Failed\FailedJobProviderInterface; |
||
8 | use Illuminate\Queue\Jobs\Job; |
||
9 | use Illuminate\Queue\MaxAttemptsExceededException; |
||
10 | use Illuminate\Queue\QueueManager; |
||
11 | use Illuminate\Queue\WorkerOptions; |
||
12 | use Throwable; |
||
13 | use Yii; |
||
14 | use yii\base\Event; |
||
15 | use yiicod\jobqueue\base\FatalThrowableError; |
||
16 | use yiicod\jobqueue\events\JobExceptionOccurredEvent; |
||
17 | use yiicod\jobqueue\events\JobFailedEvent; |
||
18 | use yiicod\jobqueue\events\JobProcessedEvent; |
||
19 | use yiicod\jobqueue\events\JobProcessingEvent; |
||
20 | use yiicod\jobqueue\events\WorkerStoppingEvent; |
||
21 | use yiicod\jobqueue\queues\MongoThreadQueue; |
||
22 | |||
23 | /** |
||
24 | * Worker for laravel queues |
||
25 | * |
||
26 | * @author Orlov Alexey <[email protected]> |
||
27 | * @author Virchenko Maksim <[email protected]> |
||
28 | */ |
||
29 | class Worker |
||
30 | { |
||
31 | /** |
||
32 | * Events |
||
33 | */ |
||
34 | const EVENT_RAISE_BEFORE_JOB = 'raiseBeforeJobEvent'; |
||
35 | const EVENT_RAISE_AFTER_JOB = 'raiseAfterJobEvent'; |
||
36 | const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'raiseExceptionOccurredJobEvent'; |
||
37 | const EVENT_RAISE_FAILED_JOB = 'raiseFailedJobEvent'; |
||
38 | const EVENT_STOP = 'stop'; |
||
39 | /** |
||
40 | * The queue manager instance. |
||
41 | * |
||
42 | * @var \Illuminate\Queue\QueueManager |
||
43 | */ |
||
44 | protected $manager; |
||
45 | |||
46 | /** |
||
47 | * Failer instance |
||
48 | * |
||
49 | * @var FailedJobProviderInterface |
||
50 | */ |
||
51 | protected $failer; |
||
52 | |||
53 | /** |
||
54 | * The exception handler instance. |
||
55 | * |
||
56 | * @var \Illuminate\Foundation\Exceptions\Handler |
||
57 | */ |
||
58 | protected $exceptions; |
||
59 | |||
60 | protected $shouldQuit = false; |
||
61 | |||
62 | /** |
||
63 | * Create a new queue worker. |
||
64 | * |
||
65 | * @param QueueManager $manager |
||
66 | * @param FailedJobProviderInterface $failer |
||
67 | * @param ExceptionHandler $exceptions |
||
68 | */ |
||
69 | public function __construct(QueueManager $manager, |
||
70 | FailedJobProviderInterface $failer, |
||
71 | ExceptionHandler $exceptions) |
||
72 | { |
||
73 | $this->manager = $manager; |
||
74 | $this->failer = $failer; |
||
75 | $this->exceptions = $exceptions; |
||
0 ignored issues
–
show
|
|||
76 | } |
||
77 | |||
78 | /** |
||
79 | * Listen to the given queue in a loop. |
||
80 | * |
||
81 | * @param string $connectionName |
||
82 | * @param string $queue |
||
83 | * @param WorkerOptions $options |
||
84 | * |
||
85 | * @return array |
||
86 | */ |
||
87 | public function daemon($connectionName, $queue, WorkerOptions $options) |
||
88 | { |
||
89 | while (true) { |
||
90 | if ($this->shouldQuit) { |
||
91 | $this->kill(); |
||
92 | } |
||
93 | |||
94 | if (false === $this->runNextJob($connectionName, $queue, $options)) { |
||
95 | $this->sleep($options->sleep); |
||
96 | } |
||
97 | |||
98 | if ($this->memoryExceeded($options->memory)) { |
||
99 | $this->stop(); |
||
100 | } |
||
101 | } |
||
102 | } |
||
103 | |||
104 | /** |
||
105 | * Process the next job on the queue. |
||
106 | * |
||
107 | * @param string $connectionName |
||
108 | * @param string $queue |
||
109 | * @param \Illuminate\Queue\WorkerOptions $options |
||
110 | */ |
||
111 | public function runNextJob($connectionName, $queue, WorkerOptions $options) |
||
0 ignored issues
–
show
|
|||
112 | { |
||
113 | /** @var MongoThreadQueue|Queue $connection */ |
||
114 | $connection = $this->manager->connection($connectionName); |
||
115 | $job = $this->getNextJob($connection, $queue); |
||
116 | |||
117 | // If we're able to pull a job off of the stack, we will process it and then return |
||
118 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
119 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
120 | if ($job instanceof Job && $connection->canRunJob($job)) { |
||
121 | // If job can be run, then markJobAsReserved and run process |
||
122 | $connection->markJobAsReserved($job); |
||
123 | |||
124 | $this->runInBackground($job, $connectionName); |
||
125 | |||
126 | return true; |
||
127 | } |
||
128 | |||
129 | return false; |
||
130 | } |
||
131 | |||
132 | /** |
||
133 | * Make a Process for the Artisan command for the job id. |
||
134 | * |
||
135 | * @param Job $job |
||
136 | * @param string $connectionName |
||
137 | */ |
||
138 | protected function runInBackground(Job $job, string $connectionName) |
||
139 | { |
||
140 | $process = Yii::$container->get(JobProcess::class)->getProcess($job, $connectionName); |
||
141 | |||
142 | $process->run(); |
||
143 | } |
||
144 | |||
145 | /** |
||
146 | * Process the given job from the queue. |
||
147 | * |
||
148 | * @param string $connectionName |
||
149 | * @param \Illuminate\Contracts\Queue\Job $job |
||
150 | * @param \Illuminate\Queue\WorkerOptions $options |
||
151 | * |
||
152 | * @throws \Throwable |
||
153 | */ |
||
154 | public function process($connectionName, $job, WorkerOptions $options) |
||
155 | { |
||
156 | try { |
||
157 | // First we will raise the before job event and determine if the job has already ran |
||
158 | // over the its maximum attempt limit, which could primarily happen if the job is |
||
159 | // continually timing out and not actually throwing any exceptions from itself. |
||
160 | $this->raiseBeforeJobEvent($connectionName, $job); |
||
161 | |||
162 | $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( |
||
163 | $connectionName, $job, (int)$options->maxTries |
||
164 | ); |
||
165 | |||
166 | // Here we will fire off the job and let it process. We will catch any exceptions so |
||
167 | // they can be reported to the developers logs, etc. Once the job is finished the |
||
168 | // proper events will be fired to let any listeners know this job has finished. |
||
169 | $job->fire(); |
||
170 | |||
171 | $this->raiseAfterJobEvent($connectionName, $job); |
||
172 | } catch (Exception $e) { |
||
173 | $this->handleJobException($connectionName, $job, $options, $e); |
||
174 | } catch (Throwable $e) { |
||
0 ignored issues
–
show
|
|||
175 | $this->handleJobException( |
||
176 | $connectionName, $job, $options, new FatalThrowableError($e) |
||
177 | ); |
||
178 | } |
||
179 | } |
||
180 | |||
181 | /** |
||
182 | * Handle an exception that occurred while the job was running. |
||
183 | * |
||
184 | * @param string $connectionName |
||
185 | * @param \Illuminate\Contracts\Queue\Job $job |
||
186 | * @param WorkerOptions $options |
||
187 | * @param \Exception $e |
||
188 | * |
||
189 | * @throws \Exception |
||
190 | */ |
||
191 | protected function handleJobException($connectionName, $job, WorkerOptions $options, $e) |
||
192 | { |
||
193 | try { |
||
194 | // First, we will go ahead and mark the job as failed if it will exceed the maximum |
||
195 | // attempts it is allowed to run the next time we process it. If so we will just |
||
196 | // go ahead and mark it as failed now so we do not have to release this again. |
||
197 | if (!$job->hasFailed()) { |
||
0 ignored issues
–
show
The method
hasFailed() does not exist on Illuminate\Contracts\Queue\Job . Did you maybe mean failed() ?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. ![]() |
|||
198 | $this->markJobAsFailedIfWillExceedMaxAttempts( |
||
199 | $connectionName, $job, (int)$options->maxTries, $e |
||
200 | ); |
||
201 | } |
||
202 | |||
203 | $this->raiseExceptionOccurredJobEvent( |
||
204 | $connectionName, $job, $e |
||
205 | ); |
||
206 | } finally { |
||
207 | // If we catch an exception, we will attempt to release the job back onto the queue |
||
208 | // so it is not lost entirely. This'll let the job be retried at a later time by |
||
209 | // another listener (or this same one). We will re-throw this exception after. |
||
210 | if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) { |
||
0 ignored issues
–
show
The method
isReleased() does not exist on Illuminate\Contracts\Queue\Job . Did you maybe mean release() ?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. ![]() The method
hasFailed() does not exist on Illuminate\Contracts\Queue\Job . Did you maybe mean failed() ?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. ![]() |
|||
211 | $job->release($options->delay); |
||
212 | } |
||
213 | } |
||
214 | |||
215 | throw $e; |
||
216 | } |
||
217 | |||
218 | /** |
||
219 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
220 | * |
||
221 | * This will likely be because the job previously exceeded a timeout. |
||
222 | * |
||
223 | * @param string $connectionName |
||
224 | * @param \Illuminate\Contracts\Queue\Job $job |
||
225 | * @param int $maxTries |
||
226 | */ |
||
227 | protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) |
||
228 | { |
||
229 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
230 | |||
231 | if (0 === $maxTries || $job->attempts() <= $maxTries) { |
||
232 | return; |
||
233 | } |
||
234 | |||
235 | $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( |
||
236 | 'A queued job has been attempted too many times. The job may have previously timed out.' |
||
237 | )); |
||
238 | |||
239 | throw $e; |
||
240 | } |
||
241 | |||
242 | /** |
||
243 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
244 | * |
||
245 | * @param string $connectionName |
||
246 | * @param \Illuminate\Contracts\Queue\Job $job |
||
247 | * @param int $maxTries |
||
248 | * @param \Exception $e |
||
249 | */ |
||
250 | protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) |
||
251 | { |
||
252 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
253 | |||
254 | if ($maxTries > 0 && $job->attempts() >= $maxTries) { |
||
255 | $this->failJob($connectionName, $job, $e); |
||
256 | } |
||
257 | } |
||
258 | |||
259 | /** |
||
260 | * Get the next job from the queue connection. |
||
261 | * |
||
262 | * @param \Illuminate\Contracts\Queue\Queue $connection |
||
263 | * @param string $queue |
||
264 | * |
||
265 | * @return \Illuminate\Contracts\Queue\Job|null |
||
266 | */ |
||
267 | protected function getNextJob($connection, $queue) |
||
268 | { |
||
269 | try { |
||
270 | foreach (explode(',', $queue) as $queue) { |
||
271 | if (!is_null($job = $connection->pop($queue))) { |
||
272 | return $job; |
||
273 | } |
||
274 | } |
||
275 | } catch (Exception $e) { |
||
276 | $this->exceptions->report($e); |
||
277 | } catch (Throwable $e) { |
||
0 ignored issues
–
show
|
|||
278 | $this->exceptions->report($e = new FatalThrowableError($e)); |
||
279 | } |
||
280 | } |
||
281 | |||
282 | /** |
||
283 | * Kill the process. |
||
284 | * |
||
285 | * @param int $status |
||
286 | */ |
||
287 | public function kill($status = 0) |
||
288 | { |
||
289 | if (extension_loaded('posix')) { |
||
290 | posix_kill(getmypid(), SIGKILL); |
||
291 | } |
||
292 | |||
293 | exit($status); |
||
294 | } |
||
295 | |||
296 | /** |
||
297 | * Sleep the script for a given number of seconds. |
||
298 | * |
||
299 | * @param int $seconds |
||
300 | */ |
||
301 | public function sleep($seconds) |
||
302 | { |
||
303 | sleep($seconds); |
||
304 | } |
||
305 | |||
306 | /** |
||
307 | * Process the next job on the queue. |
||
308 | * |
||
309 | * @param string $connectionName |
||
310 | * @param $id |
||
311 | * @param \Illuminate\Queue\WorkerOptions $options |
||
312 | */ |
||
313 | public function runJobById($connectionName, $id, WorkerOptions $options) |
||
314 | { |
||
315 | try { |
||
316 | $job = $this->manager->connection($connectionName)->getJobById($id); |
||
0 ignored issues
–
show
It seems like you code against a concrete implementation and not the interface
Illuminate\Contracts\Queue\Queue as the method getJobById() does only exist in the following implementations of said interface: yiicod\jobqueue\queues\MongoThreadQueue .
Let’s take a look at an example: interface User
{
/** @return string */
public function getPassword();
}
class MyUser implements User
{
public function getPassword()
{
// return something
}
public function getDisplayName()
{
// return some name.
}
}
class AuthSystem
{
public function authenticate(User $user)
{
$this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
// do something.
}
}
In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break. Available Fixes
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types
inside the if block in such a case.
![]() |
|||
317 | |||
318 | // If we're able to pull a job off of the stack, we will process it and then return |
||
319 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
320 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
321 | if ($job) { |
||
322 | return $this->process($connectionName, $job, $options); |
||
323 | } |
||
324 | } catch (Exception $e) { |
||
325 | $this->exceptions->report($e); |
||
326 | } catch (Throwable $e) { |
||
0 ignored issues
–
show
|
|||
327 | $this->exceptions->report(new FatalThrowableError($e)); |
||
328 | } |
||
329 | |||
330 | $this->sleep($options->sleep); |
||
331 | } |
||
332 | |||
333 | /** |
||
334 | * Mark the given job as failed and raise the relevant event. |
||
335 | * |
||
336 | * @param string $connectionName |
||
337 | * @param \Illuminate\Contracts\Queue\Job $job |
||
338 | * @param \Exception $e |
||
339 | */ |
||
340 | protected function failJob($connectionName, $job, $e) |
||
341 | { |
||
342 | if ($job->isDeleted()) { |
||
343 | return; |
||
344 | } |
||
345 | |||
346 | try { |
||
347 | // If the job has failed, we will delete it, call the "failed" method and then call |
||
348 | // an event indicating the job has failed so it can be logged if needed. This is |
||
349 | // to allow every developer to better keep monitor of their failed queue jobs. |
||
350 | $job->delete(); |
||
351 | |||
352 | $job->failed($e); |
||
0 ignored issues
–
show
$e is of type object<Exception> , but the function expects a object<Throwable> .
It seems like the type of the argument is not accepted by the function/method which you are calling. In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug. We suggest to add an explicit type cast like in the following example: function acceptsInteger($int) { }
$x = '123'; // string "123"
// Instead of
acceptsInteger($x);
// we recommend to use
acceptsInteger((integer) $x);
![]() |
|||
353 | } finally { |
||
354 | $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e); |
||
355 | $this->raiseFailedJobEvent($connectionName, $job, $e); |
||
356 | } |
||
357 | } |
||
358 | |||
359 | /** |
||
360 | * Determine if the memory limit has been exceeded. |
||
361 | * |
||
362 | * @param int $memoryLimit |
||
363 | * |
||
364 | * @return bool |
||
365 | */ |
||
366 | public function memoryExceeded($memoryLimit) |
||
367 | { |
||
368 | return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; |
||
369 | } |
||
370 | |||
371 | /** |
||
372 | * Stop the worker if we have lost connection to a database. |
||
373 | * |
||
374 | * @param \Exception $e |
||
375 | */ |
||
376 | protected function stopWorkerIfLostConnection($e) |
||
377 | { |
||
378 | $message = $e->getMessage(); |
||
379 | $contains = [ |
||
380 | 'server has gone away', |
||
381 | 'no connection to the server', |
||
382 | 'Lost connection', |
||
383 | 'is dead or not enabled', |
||
384 | 'Error while sending', |
||
385 | 'decryption failed or bad record mac', |
||
386 | 'server closed the connection unexpectedly', |
||
387 | 'SSL connection has been closed unexpectedly', |
||
388 | 'Error writing data to the connection', |
||
389 | 'Resource deadlock avoided', |
||
390 | 'Transaction() on null', |
||
391 | ]; |
||
392 | |||
393 | foreach ($contains as $contain) { |
||
394 | if (strpos($message, $contain)) { |
||
395 | $this->shouldQuit = true; |
||
396 | } |
||
397 | } |
||
398 | } |
||
399 | |||
400 | /** |
||
401 | * Raise the before queue job event. |
||
402 | * |
||
403 | * @param string $connectionName |
||
404 | * @param \Illuminate\Contracts\Queue\Job $job |
||
405 | */ |
||
406 | protected function raiseBeforeJobEvent($connectionName, $job) |
||
407 | { |
||
408 | Event::trigger(self::class, self::EVENT_RAISE_BEFORE_JOB, new JobProcessingEvent($connectionName, $job)); |
||
409 | } |
||
410 | |||
411 | /** |
||
412 | * Raise the after queue job event. |
||
413 | * |
||
414 | * @param string $connectionName |
||
415 | * @param \Illuminate\Contracts\Queue\Job $job |
||
416 | */ |
||
417 | protected function raiseAfterJobEvent($connectionName, $job) |
||
418 | { |
||
419 | Event::trigger(self::class, self::EVENT_RAISE_AFTER_JOB, new JobProcessedEvent($connectionName, $job)); |
||
420 | } |
||
421 | |||
422 | /** |
||
423 | * Raise the exception occurred queue job event. |
||
424 | * |
||
425 | * @param string $connectionName |
||
426 | * @param \Illuminate\Contracts\Queue\Job $job |
||
427 | * @param \Exception $e |
||
428 | */ |
||
429 | protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e) |
||
430 | { |
||
431 | Event::trigger(self::class, self::EVENT_RAISE_EXCEPTION_OCCURED_JOB, new JobExceptionOccurredEvent( |
||
432 | $connectionName, $job, $e |
||
433 | )); |
||
434 | } |
||
435 | |||
436 | /** |
||
437 | * Raise the failed queue job event. |
||
438 | * |
||
439 | * @param string $connectionName |
||
440 | * @param \Illuminate\Contracts\Queue\Job $job |
||
441 | * @param \Exception $e |
||
442 | */ |
||
443 | protected function raiseFailedJobEvent($connectionName, $job, $e) |
||
444 | { |
||
445 | Event::trigger(self::class, self::EVENT_RAISE_FAILED_JOB, new JobFailedEvent( |
||
446 | $connectionName, $job, $e |
||
447 | )); |
||
448 | } |
||
449 | |||
450 | /** |
||
451 | * Stop listening and bail out of the script. |
||
452 | */ |
||
453 | public function stop($status = 0) |
||
0 ignored issues
–
show
|
|||
454 | { |
||
455 | Event::trigger(self::class, self::EVENT_STOP, new WorkerStoppingEvent()); |
||
456 | |||
457 | Yii::$app->end(); |
||
458 | } |
||
459 | } |
||
460 |
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.
Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..