resque /
php-resque
| 1 | <?php |
||||
| 2 | 1 | ||||
| 3 | declare(ticks=1); |
||||
| 4 | |||||
| 5 | /** |
||||
| 6 | * Resque worker that handles checking queues for jobs, fetching them |
||||
| 7 | * off the queues, running them and handling the result. |
||||
| 8 | * |
||||
| 9 | * @package Resque/Worker |
||||
| 10 | * @author Chris Boulton <[email protected]> |
||||
| 11 | * @license http://www.opensource.org/licenses/mit-license.php |
||||
| 12 | */ |
||||
| 13 | class Resque_Worker |
||||
| 14 | { |
||||
| 15 | /** |
||||
| 16 | * @var string Prefix for the process name |
||||
| 17 | */ |
||||
| 18 | private static $processPrefix = 'resque'; |
||||
| 19 | |||||
| 20 | /** |
||||
| 21 | * @var Psr\Log\LoggerInterface Logging object that impliments the PSR-3 LoggerInterface |
||||
| 22 | */ |
||||
| 23 | public $logger; |
||||
| 24 | |||||
| 25 | /** |
||||
| 26 | * @var bool Whether this worker is running in a forked child process. |
||||
| 27 | */ |
||||
| 28 | public $hasParent = false; |
||||
| 29 | |||||
| 30 | /** |
||||
| 31 | * @var array Array of all associated queues for this worker. |
||||
| 32 | */ |
||||
| 33 | private $queues = array(); |
||||
| 34 | |||||
| 35 | /** |
||||
| 36 | * @var string The hostname of this worker. |
||||
| 37 | */ |
||||
| 38 | private $hostname; |
||||
| 39 | |||||
| 40 | /** |
||||
| 41 | * @var boolean True if on the next iteration, the worker should shutdown. |
||||
| 42 | */ |
||||
| 43 | private $shutdown = false; |
||||
| 44 | |||||
| 45 | /** |
||||
| 46 | * @var boolean True if this worker is paused. |
||||
| 47 | */ |
||||
| 48 | private $paused = false; |
||||
| 49 | |||||
| 50 | /** |
||||
| 51 | * @var string String identifying this worker. |
||||
| 52 | */ |
||||
| 53 | private $id; |
||||
| 54 | |||||
| 55 | /** |
||||
| 56 | * @var Resque_Job Current job, if any, being processed by this worker. |
||||
| 57 | */ |
||||
| 58 | private $currentJob = null; |
||||
| 59 | |||||
| 60 | /** |
||||
| 61 | * @var int Process ID of child worker processes. |
||||
| 62 | */ |
||||
| 63 | private $child = null; |
||||
| 64 | |||||
| 65 | /** |
||||
| 66 | * Instantiate a new worker, given a list of queues that it should be working |
||||
| 67 | * on. The list of queues should be supplied in the priority that they should |
||||
| 68 | * be checked for jobs (first come, first served) |
||||
| 69 | * |
||||
| 70 | * Passing a single '*' allows the worker to work on all queues in alphabetical |
||||
| 71 | * order. You can easily add new queues dynamically and have them worked on using |
||||
| 72 | * this method. |
||||
| 73 | * |
||||
| 74 | * @param string|array $queues String with a single queue name, array with multiple. |
||||
| 75 | 69 | */ |
|||
| 76 | public function __construct($queues) |
||||
| 77 | 69 | { |
|||
| 78 | $this->logger = new Resque_Log(); |
||||
| 79 | 69 | ||||
| 80 | 67 | if (!is_array($queues)) { |
|||
| 81 | $queues = array($queues); |
||||
| 82 | } |
||||
| 83 | 69 | ||||
| 84 | 69 | $this->queues = $queues; |
|||
| 85 | $this->hostname = php_uname('n'); |
||||
| 86 | 69 | ||||
| 87 | 69 | $this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues); |
|||
| 88 | } |
||||
| 89 | |||||
| 90 | /** |
||||
| 91 | * Set the process prefix of the workers to the given prefix string. |
||||
| 92 | * @param string $prefix The new process prefix |
||||
| 93 | */ |
||||
| 94 | public static function setProcessPrefix($prefix) |
||||
| 95 | { |
||||
| 96 | self::$processPrefix = $prefix; |
||||
| 97 | } |
||||
| 98 | |||||
| 99 | /** |
||||
| 100 | * Return all workers known to Resque as instantiated instances. |
||||
| 101 | * @return array |
||||
| 102 | 17 | */ |
|||
| 103 | public static function all() |
||||
| 104 | 17 | { |
|||
| 105 | 17 | $workers = Resque::redis()->smembers('workers'); |
|||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
| 106 | if (!is_array($workers)) { |
||||
| 107 | $workers = array(); |
||||
| 108 | } |
||||
| 109 | 17 | ||||
| 110 | 17 | $instances = array(); |
|||
| 111 | 8 | foreach ($workers as $workerId) { |
|||
| 112 | $instances[] = self::find($workerId); |
||||
| 113 | 17 | } |
|||
| 114 | return $instances; |
||||
| 115 | } |
||||
| 116 | |||||
| 117 | /** |
||||
| 118 | * Given a worker ID, check if it is registered/valid. |
||||
| 119 | * |
||||
| 120 | * @param string $workerId ID of the worker. |
||||
| 121 | * @return boolean True if the worker exists, false if not. |
||||
| 122 | 11 | */ |
|||
| 123 | public static function exists($workerId) |
||||
| 124 | 11 | { |
|||
| 125 | return (bool)Resque::redis()->sismember('workers', $workerId); |
||||
|
0 ignored issues
–
show
The method
sismember() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 126 | } |
||||
| 127 | |||||
| 128 | /** |
||||
| 129 | * Given a worker ID, find it and return an instantiated worker class for it. |
||||
| 130 | * |
||||
| 131 | * @param string $workerId The ID of the worker. |
||||
| 132 | * @return Resque_Worker Instance of the worker. False if the worker does not exist. |
||||
| 133 | 9 | */ |
|||
| 134 | public static function find($workerId) |
||||
| 135 | 9 | { |
|||
| 136 | if (!self::exists($workerId) || false === strpos($workerId, ":")) { |
||||
| 137 | return false; |
||||
|
0 ignored issues
–
show
|
|||||
| 138 | } |
||||
| 139 | 9 | ||||
| 140 | 9 | list($hostname, $pid, $queues) = explode(':', $workerId, 3); |
|||
| 141 | 9 | $queues = explode(',', $queues); |
|||
| 142 | 9 | $worker = new self($queues); |
|||
| 143 | 9 | $worker->setId($workerId); |
|||
| 144 | return $worker; |
||||
| 145 | } |
||||
| 146 | |||||
| 147 | /** |
||||
| 148 | * Set the ID of this worker to a given ID string. |
||||
| 149 | * |
||||
| 150 | * @param string $workerId ID for the worker. |
||||
| 151 | 9 | */ |
|||
| 152 | public function setId($workerId) |
||||
| 153 | 9 | { |
|||
| 154 | 9 | $this->id = $workerId; |
|||
| 155 | } |
||||
| 156 | |||||
| 157 | /** |
||||
| 158 | * The primary loop for a worker which when called on an instance starts |
||||
| 159 | * the worker's life cycle. |
||||
| 160 | * |
||||
| 161 | * Queues are checked every $interval (seconds) for new jobs. |
||||
| 162 | * |
||||
| 163 | * @param int $interval How often to check for new jobs across the queues. |
||||
| 164 | 13 | */ |
|||
| 165 | public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) |
||||
| 166 | 13 | { |
|||
| 167 | 13 | $this->updateProcLine('Starting'); |
|||
| 168 | $this->startup(); |
||||
| 169 | 13 | ||||
| 170 | 13 | if (function_exists('pcntl_signal_dispatch')) { |
|||
| 171 | pcntl_signal_dispatch(); |
||||
| 172 | } |
||||
| 173 | 13 | ||||
| 174 | 13 | while (true) { |
|||
| 175 | if ($this->shutdown) { |
||||
| 176 | break; |
||||
| 177 | } |
||||
| 178 | |||||
| 179 | // is redis still alive? |
||||
| 180 | 13 | try { |
|||
| 181 | if (!$this->paused && Resque::redis()->ping() === false) { |
||||
|
0 ignored issues
–
show
The method
ping() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 182 | throw new CredisException('redis ping() failed'); |
||||
| 183 | } |
||||
| 184 | } catch (CredisException $e) { |
||||
| 185 | $this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect'); |
||||
| 186 | Resque::$redis = null; |
||||
| 187 | usleep($interval * 1000000); |
||||
| 188 | continue; |
||||
| 189 | } |
||||
| 190 | |||||
| 191 | 13 | // Attempt to find and reserve a job |
|||
| 192 | 13 | $job = false; |
|||
| 193 | 12 | if (!$this->paused) { |
|||
| 194 | if ($blocking === true) { |
||||
| 195 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); |
||||
| 196 | $this->updateProcLine('Waiting with blocking timeout ' . $interval); |
||||
| 197 | 12 | } else { |
|||
| 198 | $this->updateProcLine('Waiting with interval ' . $interval); |
||||
| 199 | } |
||||
| 200 | 12 | ||||
| 201 | $job = $this->reserve($blocking, $interval); |
||||
| 202 | } |
||||
| 203 | 13 | ||||
| 204 | if (!$job) { |
||||
| 205 | 13 | // For an interval of 0, break now - helps with unit testing etc |
|||
| 206 | 13 | if ($interval == 0) { |
|||
| 207 | break; |
||||
| 208 | } |
||||
| 209 | |||||
| 210 | if ($blocking === false) { |
||||
| 211 | // If no job was found, we sleep for $interval before continuing and checking again |
||||
| 212 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); |
||||
| 213 | if ($this->paused) { |
||||
| 214 | $this->updateProcLine('Paused'); |
||||
| 215 | } else { |
||||
| 216 | $this->updateProcLine('Waiting'); |
||||
| 217 | } |
||||
| 218 | |||||
| 219 | usleep($interval * 1000000); |
||||
| 220 | } |
||||
| 221 | |||||
| 222 | continue; |
||||
| 223 | } |
||||
| 224 | |||||
| 225 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); |
||||
| 226 | 9 | Resque_Event::trigger('beforeFork', $job); |
|||
| 227 | 9 | $this->workingOn($job); |
|||
|
0 ignored issues
–
show
It seems like
$job can also be of type true; however, parameter $job of Resque_Worker::workingOn() does only seem to accept Resque_Job, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 228 | 9 | ||||
| 229 | $this->child = Resque::fork(); |
||||
| 230 | 9 | ||||
| 231 | // Forked and we're the child. Or PCNTL is not installed. Run the job. |
||||
| 232 | if ($this->child === 0 || $this->child === false || $this->child === -1) { |
||||
| 233 | 9 | $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); |
|||
| 234 | $this->updateProcLine($status); |
||||
| 235 | $this->logger->log(Psr\Log\LogLevel::INFO, $status); |
||||
| 236 | |||||
| 237 | if (!empty($job->payload['id'])) { |
||||
| 238 | Resque_Job_PID::create($job->payload['id']); |
||||
| 239 | } |
||||
| 240 | |||||
| 241 | $this->perform($job); |
||||
|
0 ignored issues
–
show
It seems like
$job can also be of type true; however, parameter $job of Resque_Worker::perform() does only seem to accept Resque_Job, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 242 | |||||
| 243 | if (!empty($job->payload['id'])) { |
||||
| 244 | Resque_Job_PID::del($job->payload['id']); |
||||
| 245 | } |
||||
| 246 | |||||
| 247 | if ($this->child === 0) { |
||||
| 248 | exit(0); |
||||
|
0 ignored issues
–
show
|
|||||
| 249 | } |
||||
| 250 | } |
||||
| 251 | |||||
| 252 | if ($this->child > 0) { |
||||
| 253 | 9 | // Parent process, sit and wait |
|||
| 254 | $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); |
||||
| 255 | 9 | $this->updateProcLine($status); |
|||
| 256 | 9 | $this->logger->log(Psr\Log\LogLevel::INFO, $status); |
|||
| 257 | 9 | ||||
| 258 | // Wait until the child process finishes before continuing |
||||
| 259 | while (pcntl_wait($status, WNOHANG) === 0) { |
||||
| 260 | 9 | if (function_exists('pcntl_signal_dispatch')) { |
|||
| 261 | 9 | pcntl_signal_dispatch(); |
|||
| 262 | 9 | } |
|||
| 263 | |||||
| 264 | // Pause for a half a second to conserve system resources |
||||
| 265 | usleep(500000); |
||||
| 266 | 9 | } |
|||
| 267 | |||||
| 268 | if (pcntl_wifexited($status) !== true) { |
||||
| 269 | 9 | $job->fail(new Resque_Job_DirtyExitException('Job exited abnormally')); |
|||
| 270 | } elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) { |
||||
| 271 | 9 | $job->fail(new Resque_Job_DirtyExitException( |
|||
| 272 | 'Job exited with exit code ' . $exitStatus |
||||
| 273 | )); |
||||
| 274 | } else { |
||||
| 275 | if (in_array($job->getStatus(), array(Resque_Job_Status::STATUS_WAITING, Resque_Job_Status::STATUS_RUNNING))) { |
||||
| 276 | $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); |
||||
| 277 | $this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . $job); |
||||
|
0 ignored issues
–
show
Are you sure
$job of type object|true can be used in concatenation?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 278 | 9 | } |
|||
| 279 | } |
||||
| 280 | 1 | } |
|||
| 281 | 1 | ||||
| 282 | $this->child = null; |
||||
| 283 | $this->doneWorking(); |
||||
| 284 | } |
||||
| 285 | |||||
| 286 | 9 | $this->unregisterWorker(); |
|||
| 287 | 9 | } |
|||
| 288 | |||||
| 289 | /** |
||||
| 290 | 13 | * Process a single job. |
|||
| 291 | 13 | * |
|||
| 292 | * @param Resque_Job $job The job to be processed. |
||||
| 293 | */ |
||||
| 294 | public function perform(Resque_Job $job) |
||||
| 295 | { |
||||
| 296 | $result = null; |
||||
|
0 ignored issues
–
show
|
|||||
| 297 | try { |
||||
| 298 | 5 | Resque_Event::trigger('afterFork', $job); |
|||
| 299 | $result = $job->perform(); |
||||
| 300 | 5 | } catch (Exception $e) { |
|||
| 301 | $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); |
||||
| 302 | 5 | $job->fail($e); |
|||
| 303 | 5 | return; |
|||
| 304 | } catch (Error $e) { |
||||
| 305 | 1 | $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); |
|||
| 306 | 1 | $job->fail($e); |
|||
| 307 | 1 | return; |
|||
| 308 | 1 | } |
|||
| 309 | |||||
| 310 | $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result); |
||||
| 311 | $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); |
||||
| 312 | } |
||||
| 313 | |||||
| 314 | /** |
||||
| 315 | * @param bool $blocking |
||||
| 316 | 4 | * @param int $timeout |
|||
| 317 | 4 | * @return object|boolean Instance of Resque_Job if a job is found, false if not. |
|||
| 318 | 4 | */ |
|||
| 319 | public function reserve($blocking = false, $timeout = null) |
||||
| 320 | { |
||||
| 321 | if ($this->hasParent && !posix_kill(posix_getppid(), 0)) { |
||||
| 322 | $this->shutdown(); |
||||
| 323 | return false; |
||||
| 324 | } |
||||
| 325 | 22 | ||||
| 326 | $queues = $this->queues(); |
||||
| 327 | 22 | if (!is_array($queues)) { |
|||
|
0 ignored issues
–
show
|
|||||
| 328 | return; |
||||
| 329 | } |
||||
| 330 | |||||
| 331 | if ($blocking === true) { |
||||
| 332 | if (empty($queues)) { |
||||
| 333 | 22 | $this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout)); |
|||
| 334 | 22 | usleep($timeout * 1000000); |
|||
| 335 | return false; |
||||
| 336 | } |
||||
| 337 | $job = Resque_Job::reserveBlocking($queues, $timeout); |
||||
| 338 | 22 | if ($job) { |
|||
|
0 ignored issues
–
show
|
|||||
| 339 | 1 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); |
|||
| 340 | return $job; |
||||
| 341 | } |
||||
| 342 | } else { |
||||
| 343 | foreach ($queues as $queue) { |
||||
| 344 | 1 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); |
|||
| 345 | 1 | $job = Resque_Job::reserve($queue); |
|||
| 346 | 1 | if ($job) { |
|||
| 347 | 1 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); |
|||
| 348 | return $job; |
||||
| 349 | } |
||||
| 350 | 21 | } |
|||
| 351 | 21 | } |
|||
| 352 | 21 | ||||
| 353 | 21 | return false; |
|||
| 354 | 17 | } |
|||
| 355 | 17 | ||||
| 356 | /** |
||||
| 357 | * Return an array containing all of the queues that this worker should use |
||||
| 358 | * when searching for jobs. |
||||
| 359 | * |
||||
| 360 | 13 | * If * is found in the list of queues, every queue will be searched in |
|||
| 361 | * alphabetic order. (@see $fetch) |
||||
| 362 | * |
||||
| 363 | * @param boolean $fetch If true, and the queue is set to *, will fetch |
||||
| 364 | * all queue names from redis. |
||||
| 365 | * @return array Array of associated queues. |
||||
| 366 | */ |
||||
| 367 | public function queues($fetch = true) |
||||
| 368 | { |
||||
| 369 | if (!in_array('*', $this->queues) || $fetch == false) { |
||||
|
0 ignored issues
–
show
|
|||||
| 370 | return $this->queues; |
||||
| 371 | } |
||||
| 372 | |||||
| 373 | $queues = Resque::queues(); |
||||
| 374 | 22 | sort($queues); |
|||
| 375 | return $queues; |
||||
| 376 | 22 | } |
|||
| 377 | 20 | ||||
| 378 | /** |
||||
| 379 | * Perform necessary actions to start a worker. |
||||
| 380 | 2 | */ |
|||
| 381 | 2 | private function startup() |
|||
| 382 | 2 | { |
|||
| 383 | $this->registerSigHandlers(); |
||||
| 384 | $this->pruneDeadWorkers(); |
||||
| 385 | Resque_Event::trigger('beforeFirstFork', $this); |
||||
| 386 | $this->registerWorker(); |
||||
| 387 | } |
||||
| 388 | 13 | ||||
| 389 | /** |
||||
| 390 | 13 | * On supported systems (with the PECL proctitle module installed), update |
|||
| 391 | 13 | * the name of the currently running process to indicate the current state |
|||
| 392 | 13 | * of a worker. |
|||
| 393 | 13 | * |
|||
| 394 | 13 | * @param string $status The updated process title. |
|||
| 395 | */ |
||||
| 396 | private function updateProcLine($status) |
||||
| 397 | { |
||||
| 398 | $processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status; |
||||
|
0 ignored issues
–
show
|
|||||
| 399 | if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { |
||||
| 400 | cli_set_process_title($processTitle); |
||||
| 401 | } elseif (function_exists('setproctitle')) { |
||||
| 402 | setproctitle($processTitle); |
||||
| 403 | 13 | } |
|||
| 404 | } |
||||
| 405 | 13 | ||||
| 406 | 13 | /** |
|||
| 407 | 13 | * Register signal handlers that a worker should respond to. |
|||
| 408 | * |
||||
| 409 | * TERM: Shutdown immediately and stop processing jobs. |
||||
| 410 | * INT: Shutdown immediately and stop processing jobs. |
||||
| 411 | * QUIT: Shutdown after the current job finishes processing. |
||||
| 412 | 13 | * USR1: Kill the forked child immediately and continue processing jobs. |
|||
| 413 | */ |
||||
| 414 | private function registerSigHandlers() |
||||
| 415 | { |
||||
| 416 | if (!function_exists('pcntl_signal')) { |
||||
| 417 | return; |
||||
| 418 | } |
||||
| 419 | |||||
| 420 | pcntl_signal(SIGTERM, array($this, 'shutDownNow')); |
||||
| 421 | pcntl_signal(SIGINT, array($this, 'shutDownNow')); |
||||
| 422 | 13 | pcntl_signal(SIGQUIT, array($this, 'shutdown')); |
|||
| 423 | pcntl_signal(SIGUSR1, array($this, 'killChild')); |
||||
| 424 | 13 | pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); |
|||
| 425 | pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); |
||||
| 426 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); |
||||
| 427 | } |
||||
| 428 | 13 | ||||
| 429 | 13 | /** |
|||
| 430 | 13 | * Signal handler callback for USR2, pauses processing of new jobs. |
|||
| 431 | 13 | */ |
|||
| 432 | 13 | public function pauseProcessing() |
|||
| 433 | 13 | { |
|||
| 434 | 13 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); |
|||
| 435 | 13 | $this->paused = true; |
|||
| 436 | } |
||||
| 437 | |||||
| 438 | /** |
||||
| 439 | * Signal handler callback for CONT, resumes worker allowing it to pick |
||||
| 440 | 2 | * up new jobs. |
|||
| 441 | */ |
||||
| 442 | 2 | public function unPauseProcessing() |
|||
| 443 | 2 | { |
|||
| 444 | 2 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); |
|||
| 445 | $this->paused = false; |
||||
| 446 | } |
||||
| 447 | |||||
| 448 | /** |
||||
| 449 | * Schedule a worker for shutdown. Will finish processing the current job |
||||
| 450 | 1 | * and when the timeout interval is reached, the worker will shut down. |
|||
| 451 | */ |
||||
| 452 | 1 | public function shutdown() |
|||
| 453 | 1 | { |
|||
| 454 | 1 | $this->shutdown = true; |
|||
| 455 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); |
||||
| 456 | } |
||||
| 457 | |||||
| 458 | /** |
||||
| 459 | * Force an immediate shutdown of the worker, killing any child jobs |
||||
| 460 | * currently running. |
||||
| 461 | */ |
||||
| 462 | public function shutdownNow() |
||||
| 463 | { |
||||
| 464 | $this->shutdown(); |
||||
| 465 | $this->killChild(); |
||||
| 466 | } |
||||
| 467 | |||||
| 468 | /** |
||||
| 469 | * @return int Child process PID. |
||||
| 470 | */ |
||||
| 471 | public function getChildPID() |
||||
| 472 | { |
||||
| 473 | return $this->child; |
||||
| 474 | } |
||||
| 475 | |||||
| 476 | /** |
||||
| 477 | * Kill a forked child job immediately. The job it is processing will not |
||||
| 478 | * be completed. |
||||
| 479 | */ |
||||
| 480 | public function killChild() |
||||
| 481 | { |
||||
| 482 | if (!$this->child) { |
||||
| 483 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); |
||||
| 484 | return; |
||||
| 485 | } |
||||
| 486 | |||||
| 487 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); |
||||
| 488 | if (exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { |
||||
| 489 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); |
||||
| 490 | posix_kill($this->child, SIGKILL); |
||||
| 491 | $this->child = null; |
||||
| 492 | } else { |
||||
| 493 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); |
||||
| 494 | $this->shutdown(); |
||||
| 495 | } |
||||
| 496 | } |
||||
| 497 | |||||
| 498 | /** |
||||
| 499 | * Look for any workers which should be running on this server and if |
||||
| 500 | * they're not, remove them from Redis. |
||||
| 501 | * |
||||
| 502 | * This is a form of garbage collection to handle cases where the |
||||
| 503 | * server may have been killed and the Resque workers did not die gracefully |
||||
| 504 | * and therefore leave state information in Redis. |
||||
| 505 | */ |
||||
| 506 | public function pruneDeadWorkers() |
||||
| 507 | { |
||||
| 508 | $workerPids = $this->workerPids(); |
||||
| 509 | $workers = self::all(); |
||||
| 510 | foreach ($workers as $worker) { |
||||
| 511 | if (is_object($worker)) { |
||||
| 512 | list($host, $pid, $queues) = explode(':', (string)$worker, 3); |
||||
| 513 | if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { |
||||
| 514 | 15 | continue; |
|||
| 515 | } |
||||
| 516 | 15 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); |
|||
| 517 | 15 | $worker->unregisterWorker(); |
|||
| 518 | 15 | } |
|||
| 519 | 7 | } |
|||
| 520 | 7 | } |
|||
| 521 | 7 | ||||
| 522 | 7 | /** |
|||
| 523 | * Return an array of process IDs for all of the Resque workers currently |
||||
| 524 | 2 | * running on this machine. |
|||
| 525 | 2 | * |
|||
| 526 | * @return array Array of Resque worker process IDs. |
||||
| 527 | */ |
||||
| 528 | 15 | public function workerPids() |
|||
| 529 | { |
||||
| 530 | $pids = array(); |
||||
| 531 | if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { |
||||
| 532 | exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput); |
||||
| 533 | foreach ($cmdOutput as $line) { |
||||
| 534 | $line = preg_replace('/\s+/m', ' ', $line); |
||||
| 535 | list(,,$pids[]) = explode(' ', trim($line), 3); |
||||
| 536 | 15 | } |
|||
| 537 | } else { |
||||
| 538 | 15 | exec('ps -A -o pid,args | grep [r]esque', $cmdOutput); |
|||
| 539 | 15 | foreach ($cmdOutput as $line) { |
|||
| 540 | list($pids[],) = explode(' ', trim($line), 2); |
||||
| 541 | } |
||||
| 542 | } |
||||
| 543 | return $pids; |
||||
| 544 | } |
||||
| 545 | |||||
| 546 | /** |
||||
| 547 | 15 | * Register this worker in Redis. |
|||
| 548 | 15 | */ |
|||
| 549 | 15 | public function registerWorker() |
|||
| 550 | { |
||||
| 551 | Resque::redis()->sadd('workers', (string)$this); |
||||
|
0 ignored issues
–
show
The method
sadd() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 552 | 15 | Resque::redis()->set('worker:' . (string)$this . ':started', date('c')); |
|||
|
0 ignored issues
–
show
The method
set() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 553 | } |
||||
| 554 | |||||
| 555 | /** |
||||
| 556 | * Unregister this worker in Redis. (shutdown etc) |
||||
| 557 | */ |
||||
| 558 | 60 | public function unregisterWorker() |
|||
| 559 | { |
||||
| 560 | 60 | if (is_object($this->currentJob)) { |
|||
| 561 | 60 | $this->currentJob->fail(new Resque_Job_DirtyExitException()); |
|||
| 562 | 60 | } |
|||
| 563 | |||||
| 564 | $id = (string)$this; |
||||
| 565 | Resque::redis()->srem('workers', $id); |
||||
|
0 ignored issues
–
show
The method
srem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 566 | Resque::redis()->del('worker:' . $id); |
||||
|
0 ignored issues
–
show
The method
del() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 567 | 17 | Resque::redis()->del('worker:' . $id . ':started'); |
|||
| 568 | Resque_Stat::clear('processed:' . $id); |
||||
| 569 | 17 | Resque_Stat::clear('failed:' . $id); |
|||
| 570 | 1 | } |
|||
| 571 | |||||
| 572 | /** |
||||
| 573 | 17 | * Tell Redis which job we're currently working on. |
|||
| 574 | 17 | * |
|||
| 575 | 17 | * @param object $job Resque_Job instance containing the job we're working on. |
|||
| 576 | 17 | */ |
|||
| 577 | 17 | public function workingOn(Resque_Job $job) |
|||
| 578 | 17 | { |
|||
| 579 | 17 | $job->worker = $this; |
|||
| 580 | $this->currentJob = $job; |
||||
| 581 | $job->updateStatus(Resque_Job_Status::STATUS_RUNNING); |
||||
| 582 | $data = json_encode(array( |
||||
| 583 | 'queue' => $job->queue, |
||||
| 584 | 'run_at' => date('c'), |
||||
| 585 | 'payload' => $job->payload |
||||
| 586 | 14 | )); |
|||
| 587 | Resque::redis()->set('worker:' . $job->worker, $data); |
||||
| 588 | 14 | } |
|||
| 589 | 14 | ||||
| 590 | 14 | /** |
|||
| 591 | 14 | * Notify Redis that we've finished working on a job, clearing the working |
|||
| 592 | 14 | * state and incrementing the job stats. |
|||
| 593 | 14 | */ |
|||
| 594 | 14 | public function doneWorking() |
|||
| 595 | { |
||||
| 596 | 14 | $this->currentJob = null; |
|||
| 597 | 14 | Resque_Stat::incr('processed'); |
|||
| 598 | Resque_Stat::incr('processed:' . (string)$this); |
||||
| 599 | Resque::redis()->del('worker:' . (string)$this); |
||||
| 600 | } |
||||
| 601 | |||||
| 602 | /** |
||||
| 603 | 10 | * Generate a string representation of this worker. |
|||
| 604 | * |
||||
| 605 | 10 | * @return string String identifier for this worker instance. |
|||
| 606 | 10 | */ |
|||
| 607 | 10 | public function __toString() |
|||
| 608 | 10 | { |
|||
| 609 | 10 | return $this->id; |
|||
| 610 | } |
||||
| 611 | |||||
| 612 | /** |
||||
| 613 | * Return an object describing the job this worker is currently working on. |
||||
| 614 | * |
||||
| 615 | * @return object Object with details of current job. |
||||
| 616 | 63 | */ |
|||
| 617 | public function job() |
||||
| 618 | 63 | { |
|||
| 619 | $job = Resque::redis()->get('worker:' . $this); |
||||
|
0 ignored issues
–
show
The method
get() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 620 | if (!$job) { |
||||
| 621 | return array(); |
||||
|
0 ignored issues
–
show
|
|||||
| 622 | } else { |
||||
| 623 | return json_decode($job, true); |
||||
| 624 | } |
||||
| 625 | } |
||||
| 626 | 2 | ||||
| 627 | /** |
||||
| 628 | 2 | * Get a statistic belonging to this worker. |
|||
| 629 | 2 | * |
|||
| 630 | 1 | * @param string $stat Statistic to fetch. |
|||
| 631 | * @return int Statistic value. |
||||
| 632 | */ |
||||
| 633 | 1 | public function getStat($stat) |
|||
| 634 | { |
||||
| 635 | return Resque_Stat::get($stat . ':' . $this); |
||||
| 636 | } |
||||
| 637 | |||||
| 638 | /** |
||||
| 639 | * Inject the logging object into the worker |
||||
| 640 | * |
||||
| 641 | * @param Psr\Log\LoggerInterface $logger |
||||
| 642 | */ |
||||
| 643 | 1 | public function setLogger(Psr\Log\LoggerInterface $logger) |
|||
| 644 | { |
||||
| 645 | 1 | $this->logger = $logger; |
|||
| 646 | } |
||||
| 647 | } |
||||
| 648 |