yiicod /
yii2-jobqueue
This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php |
||
| 2 | |||
| 3 | namespace yiicod\jobqueue; |
||
| 4 | |||
| 5 | use Exception; |
||
| 6 | use Illuminate\Contracts\Debug\ExceptionHandler; |
||
| 7 | use Illuminate\Queue\Failed\FailedJobProviderInterface; |
||
| 8 | use Illuminate\Queue\Jobs\Job; |
||
| 9 | use Illuminate\Queue\MaxAttemptsExceededException; |
||
| 10 | use Illuminate\Queue\QueueManager; |
||
| 11 | use Illuminate\Queue\WorkerOptions; |
||
| 12 | use Throwable; |
||
| 13 | use Yii; |
||
| 14 | use yii\base\Event; |
||
| 15 | use yiicod\jobqueue\base\FatalThrowableError; |
||
| 16 | use yiicod\jobqueue\events\JobExceptionOccurredEvent; |
||
| 17 | use yiicod\jobqueue\events\JobFailedEvent; |
||
| 18 | use yiicod\jobqueue\events\JobProcessedEvent; |
||
| 19 | use yiicod\jobqueue\events\JobProcessingEvent; |
||
| 20 | use yiicod\jobqueue\events\WorkerStoppingEvent; |
||
| 21 | use yiicod\jobqueue\queues\MongoThreadQueue; |
||
| 22 | |||
| 23 | /** |
||
| 24 | * Worker for laravel queues |
||
| 25 | * |
||
| 26 | * @author Orlov Alexey <[email protected]> |
||
| 27 | * @author Virchenko Maksim <[email protected]> |
||
| 28 | */ |
||
| 29 | class Worker |
||
| 30 | { |
||
| 31 | /** |
||
| 32 | * Events |
||
| 33 | */ |
||
| 34 | const EVENT_RAISE_BEFORE_JOB = 'raiseBeforeJobEvent'; |
||
| 35 | const EVENT_RAISE_AFTER_JOB = 'raiseAfterJobEvent'; |
||
| 36 | const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'raiseExceptionOccurredJobEvent'; |
||
| 37 | const EVENT_RAISE_FAILED_JOB = 'raiseFailedJobEvent'; |
||
| 38 | const EVENT_STOP = 'stop'; |
||
| 39 | /** |
||
| 40 | * The queue manager instance. |
||
| 41 | * |
||
| 42 | * @var \Illuminate\Queue\QueueManager |
||
| 43 | */ |
||
| 44 | protected $manager; |
||
| 45 | |||
| 46 | /** |
||
| 47 | * Failer instance |
||
| 48 | * |
||
| 49 | * @var FailedJobProviderInterface |
||
| 50 | */ |
||
| 51 | protected $failer; |
||
| 52 | |||
| 53 | /** |
||
| 54 | * The exception handler instance. |
||
| 55 | * |
||
| 56 | * @var \Illuminate\Foundation\Exceptions\Handler |
||
| 57 | */ |
||
| 58 | protected $exceptions; |
||
| 59 | |||
| 60 | protected $shouldQuit = false; |
||
| 61 | |||
| 62 | /** |
||
| 63 | * Create a new queue worker. |
||
| 64 | * |
||
| 65 | * @param QueueManager $manager |
||
| 66 | * @param FailedJobProviderInterface $failer |
||
| 67 | * @param ExceptionHandler $exceptions |
||
| 68 | */ |
||
| 69 | public function __construct(QueueManager $manager, |
||
| 70 | FailedJobProviderInterface $failer, |
||
| 71 | ExceptionHandler $exceptions) |
||
| 72 | { |
||
| 73 | $this->manager = $manager; |
||
| 74 | $this->failer = $failer; |
||
| 75 | $this->exceptions = $exceptions; |
||
|
0 ignored issues
–
show
|
|||
| 76 | } |
||
| 77 | |||
| 78 | /** |
||
| 79 | * Listen to the given queue in a loop. |
||
| 80 | * |
||
| 81 | * @param string $connectionName |
||
| 82 | * @param string $queue |
||
| 83 | * @param WorkerOptions $options |
||
| 84 | * |
||
| 85 | * @return array |
||
| 86 | */ |
||
| 87 | public function daemon($connectionName, $queue, WorkerOptions $options) |
||
| 88 | { |
||
| 89 | while (true) { |
||
| 90 | if ($this->shouldQuit) { |
||
| 91 | $this->kill(); |
||
| 92 | } |
||
| 93 | |||
| 94 | if (false === $this->runNextJob($connectionName, $queue, $options)) { |
||
| 95 | $this->sleep($options->sleep); |
||
| 96 | } |
||
| 97 | |||
| 98 | if ($this->memoryExceeded($options->memory)) { |
||
| 99 | $this->stop(); |
||
| 100 | } |
||
| 101 | } |
||
| 102 | } |
||
| 103 | |||
| 104 | /** |
||
| 105 | * Process the next job on the queue. |
||
| 106 | * |
||
| 107 | * @param string $connectionName |
||
| 108 | * @param string $queue |
||
| 109 | * @param \Illuminate\Queue\WorkerOptions $options |
||
| 110 | */ |
||
| 111 | public function runNextJob($connectionName, $queue, WorkerOptions $options) |
||
| 112 | { |
||
| 113 | /** @var MongoThreadQueue|Queue $connection */ |
||
| 114 | $connection = $this->manager->connection($connectionName); |
||
| 115 | $job = $this->getNextJob($connection, $queue); |
||
| 116 | |||
| 117 | // If we're able to pull a job off of the stack, we will process it and then return |
||
| 118 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
| 119 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
| 120 | if ($job instanceof Job && $connection->canRunJob($job)) { |
||
| 121 | // If job can be run, then markJobAsReserved and run process |
||
| 122 | $connection->markJobAsReserved($job); |
||
| 123 | |||
| 124 | $this->runInBackground($job, $connectionName); |
||
| 125 | |||
| 126 | return true; |
||
| 127 | } |
||
| 128 | |||
| 129 | return false; |
||
| 130 | } |
||
| 131 | |||
| 132 | /** |
||
| 133 | * Make a Process for the Artisan command for the job id. |
||
| 134 | * |
||
| 135 | * @param Job $job |
||
| 136 | * @param string $connectionName |
||
| 137 | */ |
||
| 138 | protected function runInBackground(Job $job, string $connectionName) |
||
| 139 | { |
||
| 140 | $process = Yii::$container->get(JobProcess::class)->getProcess($job, $connectionName); |
||
| 141 | |||
| 142 | $process->run(); |
||
| 143 | } |
||
| 144 | |||
| 145 | /** |
||
| 146 | * Process the given job from the queue. |
||
| 147 | * |
||
| 148 | * @param string $connectionName |
||
| 149 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 150 | * @param \Illuminate\Queue\WorkerOptions $options |
||
| 151 | * |
||
| 152 | * @throws \Throwable |
||
| 153 | */ |
||
| 154 | public function process($connectionName, $job, WorkerOptions $options) |
||
| 155 | { |
||
| 156 | try { |
||
| 157 | // First we will raise the before job event and determine if the job has already ran |
||
| 158 | // over the its maximum attempt limit, which could primarily happen if the job is |
||
| 159 | // continually timing out and not actually throwing any exceptions from itself. |
||
| 160 | $this->raiseBeforeJobEvent($connectionName, $job); |
||
| 161 | |||
| 162 | $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( |
||
| 163 | $connectionName, $job, (int)$options->maxTries |
||
| 164 | ); |
||
| 165 | |||
| 166 | // Here we will fire off the job and let it process. We will catch any exceptions so |
||
| 167 | // they can be reported to the developers logs, etc. Once the job is finished the |
||
| 168 | // proper events will be fired to let any listeners know this job has finished. |
||
| 169 | $job->fire(); |
||
| 170 | |||
| 171 | $this->raiseAfterJobEvent($connectionName, $job); |
||
| 172 | } catch (Exception $e) { |
||
| 173 | $this->handleJobException($connectionName, $job, $options, $e); |
||
| 174 | } catch (Throwable $e) { |
||
|
0 ignored issues
–
show
|
|||
| 175 | $this->handleJobException( |
||
| 176 | $connectionName, $job, $options, new FatalThrowableError($e) |
||
| 177 | ); |
||
| 178 | } |
||
| 179 | } |
||
| 180 | |||
| 181 | /** |
||
| 182 | * Handle an exception that occurred while the job was running. |
||
| 183 | * |
||
| 184 | * @param string $connectionName |
||
| 185 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 186 | * @param WorkerOptions $options |
||
| 187 | * @param \Exception $e |
||
| 188 | * |
||
| 189 | * @throws \Exception |
||
| 190 | */ |
||
| 191 | protected function handleJobException($connectionName, $job, WorkerOptions $options, $e) |
||
| 192 | { |
||
| 193 | try { |
||
| 194 | // First, we will go ahead and mark the job as failed if it will exceed the maximum |
||
| 195 | // attempts it is allowed to run the next time we process it. If so we will just |
||
| 196 | // go ahead and mark it as failed now so we do not have to release this again. |
||
| 197 | if (!$job->hasFailed()) { |
||
|
0 ignored issues
–
show
The method
hasFailed() does not exist on Illuminate\Contracts\Queue\Job. Did you maybe mean failed()?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. Loading history...
|
|||
| 198 | $this->markJobAsFailedIfWillExceedMaxAttempts( |
||
| 199 | $connectionName, $job, (int)$options->maxTries, $e |
||
| 200 | ); |
||
| 201 | } |
||
| 202 | |||
| 203 | $this->raiseExceptionOccurredJobEvent( |
||
| 204 | $connectionName, $job, $e |
||
| 205 | ); |
||
| 206 | } finally { |
||
| 207 | // If we catch an exception, we will attempt to release the job back onto the queue |
||
| 208 | // so it is not lost entirely. This'll let the job be retried at a later time by |
||
| 209 | // another listener (or this same one). We will re-throw this exception after. |
||
| 210 | if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) { |
||
|
0 ignored issues
–
show
The method
isReleased() does not exist on Illuminate\Contracts\Queue\Job. Did you maybe mean release()?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. Loading history...
The method
hasFailed() does not exist on Illuminate\Contracts\Queue\Job. Did you maybe mean failed()?
This check marks calls to methods that do not seem to exist on an object. This is most likely the result of a method being renamed without all references to it being renamed likewise. Loading history...
|
|||
| 211 | $job->release($options->delay); |
||
| 212 | } |
||
| 213 | } |
||
| 214 | |||
| 215 | throw $e; |
||
| 216 | } |
||
| 217 | |||
| 218 | /** |
||
| 219 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
| 220 | * |
||
| 221 | * This will likely be because the job previously exceeded a timeout. |
||
| 222 | * |
||
| 223 | * @param string $connectionName |
||
| 224 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 225 | * @param int $maxTries |
||
| 226 | */ |
||
| 227 | protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) |
||
| 228 | { |
||
| 229 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
| 230 | |||
| 231 | if (0 === $maxTries || $job->attempts() <= $maxTries) { |
||
| 232 | return; |
||
| 233 | } |
||
| 234 | |||
| 235 | $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( |
||
| 236 | 'A queued job has been attempted too many times. The job may have previously timed out.' |
||
| 237 | )); |
||
| 238 | |||
| 239 | throw $e; |
||
| 240 | } |
||
| 241 | |||
| 242 | /** |
||
| 243 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
| 244 | * |
||
| 245 | * @param string $connectionName |
||
| 246 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 247 | * @param int $maxTries |
||
| 248 | * @param \Exception $e |
||
| 249 | */ |
||
| 250 | protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) |
||
| 251 | { |
||
| 252 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
| 253 | |||
| 254 | if ($maxTries > 0 && $job->attempts() >= $maxTries) { |
||
| 255 | $this->failJob($connectionName, $job, $e); |
||
| 256 | } |
||
| 257 | } |
||
| 258 | |||
| 259 | /** |
||
| 260 | * Get the next job from the queue connection. |
||
| 261 | * |
||
| 262 | * @param \Illuminate\Contracts\Queue\Queue $connection |
||
| 263 | * @param string $queue |
||
| 264 | * |
||
| 265 | * @return \Illuminate\Contracts\Queue\Job|null |
||
| 266 | */ |
||
| 267 | protected function getNextJob($connection, $queue) |
||
| 268 | { |
||
| 269 | try { |
||
| 270 | foreach (explode(',', $queue) as $queue) { |
||
| 271 | if (!is_null($job = $connection->pop($queue))) { |
||
| 272 | return $job; |
||
| 273 | } |
||
| 274 | } |
||
| 275 | } catch (Exception $e) { |
||
| 276 | $this->exceptions->report($e); |
||
| 277 | } catch (Throwable $e) { |
||
|
0 ignored issues
–
show
|
|||
| 278 | $this->exceptions->report($e = new FatalThrowableError($e)); |
||
| 279 | } |
||
| 280 | } |
||
| 281 | |||
| 282 | /** |
||
| 283 | * Kill the process. |
||
| 284 | * |
||
| 285 | * @param int $status |
||
| 286 | */ |
||
| 287 | public function kill($status = 0) |
||
| 288 | { |
||
| 289 | if (extension_loaded('posix')) { |
||
| 290 | posix_kill(getmypid(), SIGKILL); |
||
| 291 | } |
||
| 292 | |||
| 293 | exit($status); |
||
| 294 | } |
||
| 295 | |||
| 296 | /** |
||
| 297 | * Sleep the script for a given number of seconds. |
||
| 298 | * |
||
| 299 | * @param int $seconds |
||
| 300 | */ |
||
| 301 | public function sleep($seconds) |
||
| 302 | { |
||
| 303 | sleep($seconds); |
||
| 304 | } |
||
| 305 | |||
| 306 | /** |
||
| 307 | * Process the next job on the queue. |
||
| 308 | * |
||
| 309 | * @param string $connectionName |
||
| 310 | * @param $id |
||
| 311 | * @param \Illuminate\Queue\WorkerOptions $options |
||
| 312 | */ |
||
| 313 | public function runJobById($connectionName, $id, WorkerOptions $options) |
||
| 314 | { |
||
| 315 | try { |
||
| 316 | $job = $this->manager->connection($connectionName)->getJobById($id); |
||
|
0 ignored issues
–
show
It seems like you code against a concrete implementation and not the interface
Illuminate\Contracts\Queue\Queue as the method getJobById() does only exist in the following implementations of said interface: yiicod\jobqueue\queues\MongoThreadQueue.
Let’s take a look at an example: interface User
{
/** @return string */
public function getPassword();
}
class MyUser implements User
{
public function getPassword()
{
// return something
}
public function getDisplayName()
{
// return some name.
}
}
class AuthSystem
{
public function authenticate(User $user)
{
$this->logger->info(sprintf('Authenticating %s.', $user->getDisplayName()));
// do something.
}
}
In the above example, the authenticate() method works fine as long as you just pass instances of MyUser. However, if you now also want to pass a different implementation of User which does not have a getDisplayName() method, the code will break. Available Fixes
Note: PHP Analyzer uses reverse abstract interpretation to narrow down the types
inside the if block in such a case.
Loading history...
|
|||
| 317 | |||
| 318 | // If we're able to pull a job off of the stack, we will process it and then return |
||
| 319 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
| 320 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
| 321 | if ($job) { |
||
| 322 | return $this->process($connectionName, $job, $options); |
||
| 323 | } |
||
| 324 | } catch (Exception $e) { |
||
| 325 | $this->exceptions->report($e); |
||
| 326 | } catch (Throwable $e) { |
||
|
0 ignored issues
–
show
|
|||
| 327 | $this->exceptions->report(new FatalThrowableError($e)); |
||
| 328 | } |
||
| 329 | |||
| 330 | $this->sleep($options->sleep); |
||
| 331 | } |
||
| 332 | |||
| 333 | /** |
||
| 334 | * Mark the given job as failed and raise the relevant event. |
||
| 335 | * |
||
| 336 | * @param string $connectionName |
||
| 337 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 338 | * @param \Exception $e |
||
| 339 | */ |
||
| 340 | protected function failJob($connectionName, $job, $e) |
||
| 341 | { |
||
| 342 | if ($job->isDeleted()) { |
||
| 343 | return; |
||
| 344 | } |
||
| 345 | |||
| 346 | try { |
||
| 347 | // If the job has failed, we will delete it, call the "failed" method and then call |
||
| 348 | // an event indicating the job has failed so it can be logged if needed. This is |
||
| 349 | // to allow every developer to better keep monitor of their failed queue jobs. |
||
| 350 | $job->delete(); |
||
| 351 | |||
| 352 | $job->failed($e); |
||
| 353 | } finally { |
||
| 354 | $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e); |
||
| 355 | $this->raiseFailedJobEvent($connectionName, $job, $e); |
||
| 356 | } |
||
| 357 | } |
||
| 358 | |||
| 359 | /** |
||
| 360 | * Determine if the memory limit has been exceeded. |
||
| 361 | * |
||
| 362 | * @param int $memoryLimit |
||
| 363 | * |
||
| 364 | * @return bool |
||
| 365 | */ |
||
| 366 | public function memoryExceeded($memoryLimit) |
||
| 367 | { |
||
| 368 | return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; |
||
| 369 | } |
||
| 370 | |||
| 371 | /** |
||
| 372 | * Stop the worker if we have lost connection to a database. |
||
| 373 | * |
||
| 374 | * @param \Exception $e |
||
| 375 | */ |
||
| 376 | protected function stopWorkerIfLostConnection($e) |
||
| 377 | { |
||
| 378 | $message = $e->getMessage(); |
||
| 379 | $contains = [ |
||
| 380 | 'server has gone away', |
||
| 381 | 'no connection to the server', |
||
| 382 | 'Lost connection', |
||
| 383 | 'is dead or not enabled', |
||
| 384 | 'Error while sending', |
||
| 385 | 'decryption failed or bad record mac', |
||
| 386 | 'server closed the connection unexpectedly', |
||
| 387 | 'SSL connection has been closed unexpectedly', |
||
| 388 | 'Error writing data to the connection', |
||
| 389 | 'Resource deadlock avoided', |
||
| 390 | 'Transaction() on null', |
||
| 391 | ]; |
||
| 392 | |||
| 393 | foreach ($contains as $contain) { |
||
| 394 | if (strpos($message, $contain)) { |
||
| 395 | $this->shouldQuit = true; |
||
| 396 | } |
||
| 397 | } |
||
| 398 | } |
||
| 399 | |||
| 400 | /** |
||
| 401 | * Raise the before queue job event. |
||
| 402 | * |
||
| 403 | * @param string $connectionName |
||
| 404 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 405 | */ |
||
| 406 | protected function raiseBeforeJobEvent($connectionName, $job) |
||
| 407 | { |
||
| 408 | Event::trigger(self::class, self::EVENT_RAISE_BEFORE_JOB, new JobProcessingEvent($connectionName, $job)); |
||
| 409 | } |
||
| 410 | |||
| 411 | /** |
||
| 412 | * Raise the after queue job event. |
||
| 413 | * |
||
| 414 | * @param string $connectionName |
||
| 415 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 416 | */ |
||
| 417 | protected function raiseAfterJobEvent($connectionName, $job) |
||
| 418 | { |
||
| 419 | Event::trigger(self::class, self::EVENT_RAISE_AFTER_JOB, new JobProcessedEvent($connectionName, $job)); |
||
| 420 | } |
||
| 421 | |||
| 422 | /** |
||
| 423 | * Raise the exception occurred queue job event. |
||
| 424 | * |
||
| 425 | * @param string $connectionName |
||
| 426 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 427 | * @param \Exception $e |
||
| 428 | */ |
||
| 429 | protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e) |
||
| 430 | { |
||
| 431 | Event::trigger(self::class, self::EVENT_RAISE_EXCEPTION_OCCURED_JOB, new JobExceptionOccurredEvent( |
||
| 432 | $connectionName, $job, $e |
||
| 433 | )); |
||
| 434 | } |
||
| 435 | |||
| 436 | /** |
||
| 437 | * Raise the failed queue job event. |
||
| 438 | * |
||
| 439 | * @param string $connectionName |
||
| 440 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 441 | * @param \Exception $e |
||
| 442 | */ |
||
| 443 | protected function raiseFailedJobEvent($connectionName, $job, $e) |
||
| 444 | { |
||
| 445 | Event::trigger(self::class, self::EVENT_RAISE_FAILED_JOB, new JobFailedEvent( |
||
| 446 | $connectionName, $job, $e |
||
| 447 | )); |
||
| 448 | } |
||
| 449 | |||
| 450 | /** |
||
| 451 | * Stop listening and bail out of the script. |
||
| 452 | */ |
||
| 453 | public function stop($status = 0) |
||
| 454 | { |
||
| 455 | Event::trigger(self::class, self::EVENT_STOP, new WorkerStoppingEvent()); |
||
| 456 | |||
| 457 | Yii::$app->end(); |
||
| 458 | } |
||
| 459 | } |
||
| 460 |
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.
Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..