sfcod /
jobqueue
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
| 1 | <?php |
||
| 2 | |||
| 3 | namespace SfCod\QueueBundle; |
||
| 4 | |||
| 5 | use Carbon\Carbon; |
||
| 6 | use Exception; |
||
| 7 | use Illuminate\Contracts\Queue\Queue; |
||
| 8 | use Illuminate\Queue\Jobs\Job; |
||
| 9 | use Illuminate\Queue\MaxAttemptsExceededException; |
||
| 10 | use Illuminate\Queue\QueueManager; |
||
| 11 | use SfCod\QueueBundle\Base\FatalThrowableError; |
||
| 12 | use SfCod\QueueBundle\Event\JobExceptionOccurredEvent; |
||
| 13 | use SfCod\QueueBundle\Event\JobFailedEvent; |
||
| 14 | use SfCod\QueueBundle\Event\JobProcessedEvent; |
||
| 15 | use SfCod\QueueBundle\Event\JobProcessingEvent; |
||
| 16 | use SfCod\QueueBundle\Event\WorkerStoppingEvent; |
||
| 17 | use SfCod\QueueBundle\Failer\MongoFailedJobProvider; |
||
| 18 | use SfCod\QueueBundle\Handler\ExceptionHandlerInterface; |
||
| 19 | use SfCod\QueueBundle\Queue\MongoQueue; |
||
| 20 | use SfCod\QueueBundle\Service\JobQueue; |
||
| 21 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
| 22 | use Symfony\Component\Process\Process; |
||
| 23 | use Throwable; |
||
| 24 | |||
| 25 | /** |
||
| 26 | * Thread worker for job queues |
||
| 27 | * |
||
| 28 | * @author Virchenko Maksim <[email protected]> |
||
| 29 | */ |
||
| 30 | class Worker |
||
| 31 | { |
||
| 32 | /** |
||
| 33 | * Events |
||
| 34 | */ |
||
| 35 | const EVENT_RAISE_BEFORE_JOB = 'job_queue_worker.raise_before_job'; |
||
| 36 | const EVENT_RAISE_AFTER_JOB = 'job_queue_worker.raise_after_job'; |
||
| 37 | const EVENT_RAISE_EXCEPTION_OCCURED_JOB = 'job_queue_worker.raise_exception_occurred_job'; |
||
| 38 | const EVENT_RAISE_FAILED_JOB = 'job_queue_worker.raise_failed_job'; |
||
| 39 | const EVENT_STOP = 'job_queue_worker.stop'; |
||
| 40 | |||
| 41 | /** |
||
| 42 | * @var QueueManager |
||
| 43 | */ |
||
| 44 | private $manager; |
||
| 45 | |||
| 46 | /** |
||
| 47 | * Logger instance |
||
| 48 | * |
||
| 49 | * @var ExceptionHandler |
||
| 50 | */ |
||
| 51 | private $exceptions; |
||
| 52 | |||
| 53 | /** |
||
| 54 | * Failer instance |
||
| 55 | * |
||
| 56 | * @var MongoFailedJobProvider |
||
| 57 | */ |
||
| 58 | private $failer; |
||
| 59 | |||
| 60 | /** |
||
| 61 | * @var EventDispatcherInterface |
||
| 62 | */ |
||
| 63 | private $dispatcher; |
||
| 64 | |||
| 65 | /** |
||
| 66 | * @var JobProcess |
||
| 67 | */ |
||
| 68 | private $jobProcess; |
||
| 69 | |||
| 70 | /** |
||
| 71 | * Create a new queue worker. |
||
| 72 | * |
||
| 73 | * @param QueueManager $manager |
||
|
0 ignored issues
–
show
|
|||
| 74 | * @param MongoFailedJobProvider $failer |
||
| 75 | * @param ExceptionHandler $exceptions |
||
| 76 | */ |
||
| 77 | public function __construct(JobQueue $queue, |
||
| 78 | JobProcess $process, |
||
| 79 | MongoFailedJobProvider $failer, |
||
| 80 | ExceptionHandlerInterface $exceptions, |
||
| 81 | EventDispatcherInterface $dispatcher) |
||
| 82 | { |
||
| 83 | $this->manager = $queue->getQueueManager(); |
||
| 84 | $this->process = $process; |
||
|
0 ignored issues
–
show
The property
process does not seem to exist. Did you mean jobProcess?
An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name. If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading. Loading history...
|
|||
| 85 | $this->failer = $failer; |
||
| 86 | $this->exceptions = $exceptions; |
||
|
0 ignored issues
–
show
It seems like
$exceptions of type object<SfCod\QueueBundle...eptionHandlerInterface> is incompatible with the declared type object<SfCod\QueueBundle\ExceptionHandler> of property $exceptions.
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property. Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property.. Loading history...
|
|||
| 87 | $this->dispatcher = $dispatcher; |
||
| 88 | } |
||
| 89 | |||
| 90 | /** |
||
| 91 | * Listen to the given queue in a loop. |
||
| 92 | * |
||
| 93 | * @param string $connectionName |
||
| 94 | * @param string $queue |
||
| 95 | * @param Options $options |
||
| 96 | */ |
||
| 97 | public function daemon($connectionName, $queue, Options $options) |
||
| 98 | { |
||
| 99 | while (true) { |
||
| 100 | if (false === $this->runNextJob($connectionName, $queue, $options)) { |
||
| 101 | $this->sleep($options->sleep); |
||
| 102 | } |
||
| 103 | |||
| 104 | if ($this->memoryExceeded($options->memory)) { |
||
| 105 | $this->stop(); |
||
| 106 | } |
||
| 107 | } |
||
| 108 | } |
||
| 109 | |||
| 110 | /** |
||
| 111 | * Process the next job on the queue. |
||
| 112 | * |
||
| 113 | * @param string $connectionName |
||
| 114 | * @param string $queue |
||
| 115 | * @param Options $options |
||
| 116 | * |
||
| 117 | * @return bool |
||
| 118 | */ |
||
| 119 | public function runNextJob($connectionName, $queue, Options $options) |
||
| 120 | { |
||
| 121 | /** @var MongoQueue|Queue $connection */ |
||
| 122 | $connection = $this->manager->connection($connectionName); |
||
| 123 | |||
| 124 | $job = $this->getNextJob($connection, $queue); |
||
| 125 | |||
| 126 | // If we're able to pull a job off of the stack, we will process it and then return |
||
| 127 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
| 128 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
| 129 | if ($job instanceof Job && $connection->canRunJob($job)) { |
||
| 130 | $connection->markJobAsReserved($job); |
||
| 131 | $this->runInBackground($job, $connectionName); |
||
| 132 | |||
| 133 | return true; |
||
| 134 | } |
||
| 135 | |||
| 136 | return false; |
||
| 137 | } |
||
| 138 | |||
| 139 | /** |
||
| 140 | * Process the next job on the queue. |
||
| 141 | * |
||
| 142 | * @param string $connectionName |
||
| 143 | * @param $id |
||
| 144 | * @param Options $options |
||
| 145 | */ |
||
| 146 | public function runJobById($connectionName, $id, Options $options) |
||
| 147 | { |
||
| 148 | /** @var MongoQueue|Queue $connection */ |
||
| 149 | $connection = $this->manager->connection($connectionName); |
||
| 150 | |||
| 151 | try { |
||
| 152 | $job = $connection->getJobById($id); |
||
| 153 | |||
| 154 | // If we're able to pull a job off of the stack, we will process it and then return |
||
| 155 | // from this method. If there is no job on the queue, we will "sleep" the worker |
||
| 156 | // for the specified number of seconds, then keep processing jobs after sleep. |
||
| 157 | if ($job instanceof Job) { |
||
| 158 | if (false === $job->reserved()) { |
||
| 159 | $connection->markJobAsReserved($job); |
||
| 160 | } |
||
| 161 | |||
| 162 | $this->process($connectionName, $job, $options); |
||
| 163 | |||
| 164 | return; |
||
| 165 | } |
||
| 166 | } catch (Exception $e) { |
||
| 167 | $this->exceptions->report($e); |
||
| 168 | } catch (Throwable $e) { |
||
| 169 | $this->exceptions->report(new FatalThrowableError($e)); |
||
| 170 | } |
||
| 171 | |||
| 172 | $this->sleep($options->sleep); |
||
| 173 | } |
||
| 174 | |||
| 175 | /** |
||
| 176 | * Make a Process for the Artisan command for the job id. |
||
| 177 | * |
||
| 178 | * @param Job $job |
||
| 179 | * @param string $connectionName |
||
| 180 | */ |
||
| 181 | public function runInBackground(Job $job, string $connectionName) |
||
| 182 | { |
||
| 183 | $process = $this->process->getProcess($job, $connectionName); |
||
|
0 ignored issues
–
show
The property
process does not seem to exist. Did you mean jobProcess?
An attempt at access to an undefined property has been detected. This may either be a typographical error or the property has been renamed but there are still references to its old name. If you really want to allow access to undefined properties, you can define magic methods to allow access. See the php core documentation on Overloading. Loading history...
|
|||
| 184 | |||
| 185 | $process->run(); |
||
| 186 | } |
||
| 187 | |||
| 188 | /** Process the given job from the queue. |
||
| 189 | * |
||
| 190 | * @param string $connectionName |
||
| 191 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 192 | * @param Options $options |
||
| 193 | * |
||
| 194 | * @return void |
||
| 195 | * |
||
| 196 | * @throws \Throwable |
||
| 197 | */ |
||
| 198 | public function process($connectionName, $job, Options $options) |
||
| 199 | { |
||
| 200 | try { |
||
| 201 | // First we will raise the before job event and determine if the job has already ran |
||
| 202 | // over the its maximum attempt limit, which could primarily happen if the job is |
||
| 203 | // continually timing out and not actually throwing any exceptions from itself. |
||
| 204 | $this->raiseBeforeJobEvent($connectionName, $job); |
||
| 205 | |||
| 206 | $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( |
||
| 207 | $connectionName, $job, (int)$options->maxTries |
||
| 208 | ); |
||
| 209 | |||
| 210 | // Here we will fire off the job and let it process. We will catch any exceptions so |
||
| 211 | // they can be reported to the developers logs, etc. Once the job is finished the |
||
| 212 | // proper events will be fired to let any listeners know this job has finished. |
||
| 213 | $job->fire(); |
||
| 214 | |||
| 215 | $this->raiseAfterJobEvent($connectionName, $job); |
||
| 216 | } catch (Exception $e) { |
||
| 217 | $this->handleJobException($connectionName, $job, $options, $e); |
||
| 218 | } catch (Throwable $e) { |
||
| 219 | $this->handleJobException( |
||
| 220 | $connectionName, $job, $options, new FatalThrowableError($e) |
||
| 221 | ); |
||
| 222 | } |
||
| 223 | } |
||
| 224 | |||
| 225 | /** |
||
| 226 | * Sleep the script for a given number of seconds. |
||
| 227 | * |
||
| 228 | * @param int $seconds |
||
| 229 | * |
||
| 230 | * @return void |
||
| 231 | */ |
||
| 232 | public function sleep($seconds) |
||
| 233 | { |
||
| 234 | sleep($seconds); |
||
| 235 | } |
||
| 236 | |||
| 237 | /** |
||
| 238 | * Determine if the memory limit has been exceeded. |
||
| 239 | * |
||
| 240 | * @param int $memoryLimit |
||
| 241 | * |
||
| 242 | * @return bool |
||
| 243 | */ |
||
| 244 | public function memoryExceeded($memoryLimit) |
||
| 245 | { |
||
| 246 | return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; |
||
| 247 | } |
||
| 248 | |||
| 249 | /** |
||
| 250 | * Stop listening and bail out of the script. |
||
| 251 | * |
||
| 252 | * @param int $status |
||
| 253 | */ |
||
| 254 | public function stop($status = 0) |
||
|
0 ignored issues
–
show
|
|||
| 255 | { |
||
| 256 | $this->dispatcher->dispatch(self::EVENT_STOP, new WorkerStoppingEvent()); |
||
| 257 | |||
| 258 | exit(0); |
||
| 259 | } |
||
| 260 | |||
| 261 | /** |
||
| 262 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
| 263 | * |
||
| 264 | * This will likely be because the job previously exceeded a timeout. |
||
| 265 | * |
||
| 266 | * @param string $connectionName |
||
| 267 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 268 | * @param int $maxTries |
||
| 269 | * |
||
| 270 | * @return void |
||
| 271 | */ |
||
| 272 | protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) |
||
| 273 | { |
||
| 274 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
| 275 | |||
| 276 | $timeoutAt = $job->timeoutAt(); |
||
| 277 | |||
| 278 | if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) { |
||
| 279 | return; |
||
| 280 | } |
||
| 281 | |||
| 282 | if (!$timeoutAt && (0 === $maxTries || $job->attempts() <= $maxTries)) { |
||
| 283 | return; |
||
| 284 | } |
||
| 285 | |||
| 286 | $this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException( |
||
| 287 | 'A queued job has been attempted too many times or run too long. The job may have previously timed out.' |
||
| 288 | )); |
||
| 289 | |||
| 290 | throw $e; |
||
| 291 | } |
||
| 292 | |||
| 293 | /** |
||
| 294 | * Mark the given job as failed and raise the relevant event. |
||
| 295 | * |
||
| 296 | * @param string $connectionName |
||
| 297 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 298 | * @param \Exception $e |
||
| 299 | */ |
||
| 300 | protected function failJob($connectionName, $job, $e) |
||
| 301 | { |
||
| 302 | if ($job->isDeleted()) { |
||
| 303 | return; |
||
| 304 | } |
||
| 305 | |||
| 306 | try { |
||
| 307 | // If the job has failed, we will delete it, call the "failed" method and then call |
||
| 308 | // an event indicating the job has failed so it can be logged if needed. This is |
||
| 309 | // to allow every developer to better keep monitor of their failed queue jobs. |
||
| 310 | $job->delete(); |
||
| 311 | |||
| 312 | $job->failed($e); |
||
| 313 | } finally { |
||
| 314 | $this->failer->log($connectionName, $job->getQueue(), $job->getRawBody(), $e); |
||
| 315 | $this->raiseFailedJobEvent($connectionName, $job, $e); |
||
| 316 | } |
||
| 317 | } |
||
| 318 | |||
| 319 | /** |
||
| 320 | * Handle an exception that occurred while the job was running. |
||
| 321 | * |
||
| 322 | * @param string $connectionName |
||
| 323 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 324 | * @param Options $options |
||
| 325 | * @param \Exception $e |
||
| 326 | * |
||
| 327 | * @return void |
||
| 328 | * |
||
| 329 | * @throws \Exception |
||
| 330 | */ |
||
| 331 | protected function handleJobException($connectionName, $job, Options $options, $e) |
||
| 332 | { |
||
| 333 | try { |
||
| 334 | // First, we will go ahead and mark the job as failed if it will exceed the maximum |
||
| 335 | // attempts it is allowed to run the next time we process it. If so we will just |
||
| 336 | // go ahead and mark it as failed now so we do not have to release this again. |
||
| 337 | if (!$job->hasFailed()) { |
||
| 338 | $this->markJobAsFailedIfWillExceedMaxAttempts( |
||
| 339 | $connectionName, $job, (int)$options->maxTries, $e |
||
| 340 | ); |
||
| 341 | } |
||
| 342 | |||
| 343 | $this->raiseExceptionOccurredJobEvent( |
||
| 344 | $connectionName, $job, $e |
||
| 345 | ); |
||
| 346 | } finally { |
||
| 347 | // If we catch an exception, we will attempt to release the job back onto the queue |
||
| 348 | // so it is not lost entirely. This'll let the job be retried at a later time by |
||
| 349 | // another listener (or this same one). We will re-throw this exception after. |
||
| 350 | if (!$job->isDeleted() && !$job->isReleased() && !$job->hasFailed()) { |
||
| 351 | $job->release($options->delay); |
||
| 352 | } |
||
| 353 | } |
||
| 354 | |||
| 355 | throw $e; |
||
| 356 | } |
||
| 357 | |||
| 358 | /** |
||
| 359 | * Mark the given job as failed if it has exceeded the maximum allowed attempts. |
||
| 360 | * |
||
| 361 | * @param string $connectionName |
||
| 362 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 363 | * @param int $maxTries |
||
| 364 | * @param \Exception $e |
||
| 365 | * |
||
| 366 | * @return void |
||
| 367 | */ |
||
| 368 | protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) |
||
| 369 | { |
||
| 370 | $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; |
||
| 371 | |||
| 372 | if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) { |
||
| 373 | $this->failJob($connectionName, $job, $e); |
||
| 374 | } |
||
| 375 | |||
| 376 | if ($maxTries > 0 && $job->attempts() >= $maxTries) { |
||
| 377 | $this->failJob($connectionName, $job, $e); |
||
| 378 | } |
||
| 379 | } |
||
| 380 | |||
| 381 | /** |
||
| 382 | * Get the next job from the queue connection. |
||
| 383 | * |
||
| 384 | * @param \Illuminate\Contracts\Queue\Queue $connection |
||
| 385 | * @param string $queue |
||
| 386 | * |
||
| 387 | * @return \Illuminate\Contracts\Queue\Job|null |
||
| 388 | */ |
||
| 389 | protected function getNextJob($connection, $queue) |
||
| 390 | { |
||
| 391 | try { |
||
| 392 | foreach (explode(',', $queue) as $queue) { |
||
| 393 | if (!is_null($job = $connection->pop($queue))) { |
||
| 394 | return $job; |
||
| 395 | } |
||
| 396 | } |
||
| 397 | } catch (Exception $e) { |
||
| 398 | $this->exceptions->report($e); |
||
| 399 | } catch (Throwable $e) { |
||
| 400 | $this->exceptions->report($e = new FatalThrowableError($e)); |
||
| 401 | } |
||
| 402 | } |
||
| 403 | |||
| 404 | /** |
||
| 405 | * Raise the before queue job event. |
||
| 406 | * |
||
| 407 | * @param string $connectionName |
||
| 408 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 409 | */ |
||
| 410 | protected function raiseBeforeJobEvent($connectionName, $job) |
||
| 411 | { |
||
| 412 | $this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessingEvent($connectionName, $job)); |
||
| 413 | } |
||
| 414 | |||
| 415 | /** |
||
| 416 | * Raise the after queue job event. |
||
| 417 | * |
||
| 418 | * @param string $connectionName |
||
| 419 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 420 | */ |
||
| 421 | protected function raiseAfterJobEvent($connectionName, $job) |
||
| 422 | { |
||
| 423 | $this->dispatcher->dispatch(self::EVENT_RAISE_AFTER_JOB, new JobProcessedEvent($connectionName, $job)); |
||
| 424 | } |
||
| 425 | |||
| 426 | /** |
||
| 427 | * Raise the exception occurred queue job event. |
||
| 428 | * |
||
| 429 | * @param string $connectionName |
||
| 430 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 431 | * @param \Exception $e |
||
| 432 | */ |
||
| 433 | protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e) |
||
| 434 | { |
||
| 435 | $this->dispatcher->dispatch(self::EVENT_RAISE_EXCEPTION_OCCURED_JOB, new JobExceptionOccurredEvent($connectionName, $job, $e)); |
||
| 436 | } |
||
| 437 | |||
| 438 | /** |
||
| 439 | * Raise the failed queue job event. |
||
| 440 | * |
||
| 441 | * @param string $connectionName |
||
| 442 | * @param \Illuminate\Contracts\Queue\Job $job |
||
| 443 | * @param \Exception $e |
||
| 444 | */ |
||
| 445 | protected function raiseFailedJobEvent($connectionName, $job, $e) |
||
| 446 | { |
||
| 447 | $this->dispatcher->dispatch(self::EVENT_RAISE_FAILED_JOB, new JobFailedEvent($connectionName, $job, $e)); |
||
| 448 | } |
||
| 449 | } |
||
| 450 |
This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.
Consider the following example. The parameter
$italyis not defined by the methodfinale(...).The most likely cause is that the parameter was removed, but the annotation was not.