1 | <?php |
||||
2 | 1 | ||||
3 | declare(ticks=1); |
||||
4 | |||||
5 | /** |
||||
6 | * Resque worker that handles checking queues for jobs, fetching them |
||||
7 | * off the queues, running them and handling the result. |
||||
8 | * |
||||
9 | * @package Resque/Worker |
||||
10 | * @author Chris Boulton <[email protected]> |
||||
11 | * @license http://www.opensource.org/licenses/mit-license.php |
||||
12 | */ |
||||
13 | class Resque_Worker |
||||
14 | { |
||||
15 | /** |
||||
16 | * @var string Prefix for the process name |
||||
17 | */ |
||||
18 | private static $processPrefix = 'resque'; |
||||
19 | |||||
20 | /** |
||||
21 | * @var Psr\Log\LoggerInterface Logging object that impliments the PSR-3 LoggerInterface |
||||
22 | */ |
||||
23 | public $logger; |
||||
24 | |||||
25 | /** |
||||
26 | * @var bool Whether this worker is running in a forked child process. |
||||
27 | */ |
||||
28 | public $hasParent = false; |
||||
29 | |||||
30 | /** |
||||
31 | * @var array Array of all associated queues for this worker. |
||||
32 | */ |
||||
33 | private $queues = array(); |
||||
34 | |||||
35 | /** |
||||
36 | * @var string The hostname of this worker. |
||||
37 | */ |
||||
38 | private $hostname; |
||||
39 | |||||
40 | /** |
||||
41 | * @var boolean True if on the next iteration, the worker should shutdown. |
||||
42 | */ |
||||
43 | private $shutdown = false; |
||||
44 | |||||
45 | /** |
||||
46 | * @var boolean True if this worker is paused. |
||||
47 | */ |
||||
48 | private $paused = false; |
||||
49 | |||||
50 | /** |
||||
51 | * @var string String identifying this worker. |
||||
52 | */ |
||||
53 | private $id; |
||||
54 | |||||
55 | /** |
||||
56 | * @var Resque_Job Current job, if any, being processed by this worker. |
||||
57 | */ |
||||
58 | private $currentJob = null; |
||||
59 | |||||
60 | /** |
||||
61 | * @var int Process ID of child worker processes. |
||||
62 | */ |
||||
63 | private $child = null; |
||||
64 | |||||
65 | /** |
||||
66 | * Instantiate a new worker, given a list of queues that it should be working |
||||
67 | * on. The list of queues should be supplied in the priority that they should |
||||
68 | * be checked for jobs (first come, first served) |
||||
69 | * |
||||
70 | * Passing a single '*' allows the worker to work on all queues in alphabetical |
||||
71 | * order. You can easily add new queues dynamically and have them worked on using |
||||
72 | * this method. |
||||
73 | * |
||||
74 | * @param string|array $queues String with a single queue name, array with multiple. |
||||
75 | 69 | */ |
|||
76 | public function __construct($queues) |
||||
77 | 69 | { |
|||
78 | $this->logger = new Resque_Log(); |
||||
79 | 69 | ||||
80 | 67 | if (!is_array($queues)) { |
|||
81 | $queues = array($queues); |
||||
82 | } |
||||
83 | 69 | ||||
84 | 69 | $this->queues = $queues; |
|||
85 | $this->hostname = php_uname('n'); |
||||
86 | 69 | ||||
87 | 69 | $this->id = $this->hostname . ':' . getmypid() . ':' . implode(',', $this->queues); |
|||
88 | } |
||||
89 | |||||
90 | /** |
||||
91 | * Set the process prefix of the workers to the given prefix string. |
||||
92 | * @param string $prefix The new process prefix |
||||
93 | */ |
||||
94 | public static function setProcessPrefix($prefix) |
||||
95 | { |
||||
96 | self::$processPrefix = $prefix; |
||||
97 | } |
||||
98 | |||||
99 | /** |
||||
100 | * Return all workers known to Resque as instantiated instances. |
||||
101 | * @return array |
||||
102 | 17 | */ |
|||
103 | public static function all() |
||||
104 | 17 | { |
|||
105 | 17 | $workers = Resque::redis()->smembers('workers'); |
|||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
106 | if (!is_array($workers)) { |
||||
107 | $workers = array(); |
||||
108 | } |
||||
109 | 17 | ||||
110 | 17 | $instances = array(); |
|||
111 | 8 | foreach ($workers as $workerId) { |
|||
112 | $instances[] = self::find($workerId); |
||||
113 | 17 | } |
|||
114 | return $instances; |
||||
115 | } |
||||
116 | |||||
117 | /** |
||||
118 | * Given a worker ID, check if it is registered/valid. |
||||
119 | * |
||||
120 | * @param string $workerId ID of the worker. |
||||
121 | * @return boolean True if the worker exists, false if not. |
||||
122 | 11 | */ |
|||
123 | public static function exists($workerId) |
||||
124 | 11 | { |
|||
125 | return (bool)Resque::redis()->sismember('workers', $workerId); |
||||
0 ignored issues
–
show
The method
sismember() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
126 | } |
||||
127 | |||||
128 | /** |
||||
129 | * Given a worker ID, find it and return an instantiated worker class for it. |
||||
130 | * |
||||
131 | * @param string $workerId The ID of the worker. |
||||
132 | * @return Resque_Worker Instance of the worker. False if the worker does not exist. |
||||
133 | 9 | */ |
|||
134 | public static function find($workerId) |
||||
135 | 9 | { |
|||
136 | if (!self::exists($workerId) || false === strpos($workerId, ":")) { |
||||
137 | return false; |
||||
0 ignored issues
–
show
|
|||||
138 | } |
||||
139 | 9 | ||||
140 | 9 | list($hostname, $pid, $queues) = explode(':', $workerId, 3); |
|||
141 | 9 | $queues = explode(',', $queues); |
|||
142 | 9 | $worker = new self($queues); |
|||
143 | 9 | $worker->setId($workerId); |
|||
144 | return $worker; |
||||
145 | } |
||||
146 | |||||
147 | /** |
||||
148 | * Set the ID of this worker to a given ID string. |
||||
149 | * |
||||
150 | * @param string $workerId ID for the worker. |
||||
151 | 9 | */ |
|||
152 | public function setId($workerId) |
||||
153 | 9 | { |
|||
154 | 9 | $this->id = $workerId; |
|||
155 | } |
||||
156 | |||||
157 | /** |
||||
158 | * The primary loop for a worker which when called on an instance starts |
||||
159 | * the worker's life cycle. |
||||
160 | * |
||||
161 | * Queues are checked every $interval (seconds) for new jobs. |
||||
162 | * |
||||
163 | * @param int $interval How often to check for new jobs across the queues. |
||||
164 | 13 | */ |
|||
165 | public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) |
||||
166 | 13 | { |
|||
167 | 13 | $this->updateProcLine('Starting'); |
|||
168 | $this->startup(); |
||||
169 | 13 | ||||
170 | 13 | if (function_exists('pcntl_signal_dispatch')) { |
|||
171 | pcntl_signal_dispatch(); |
||||
172 | } |
||||
173 | 13 | ||||
174 | 13 | while (true) { |
|||
175 | if ($this->shutdown) { |
||||
176 | break; |
||||
177 | } |
||||
178 | |||||
179 | // is redis still alive? |
||||
180 | 13 | try { |
|||
181 | if (!$this->paused && Resque::redis()->ping() === false) { |
||||
0 ignored issues
–
show
The method
ping() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
182 | throw new CredisException('redis ping() failed'); |
||||
183 | } |
||||
184 | } catch (CredisException $e) { |
||||
185 | $this->logger->log(Psr\Log\LogLevel::ERROR, 'redis went away. trying to reconnect'); |
||||
186 | Resque::$redis = null; |
||||
187 | usleep($interval * 1000000); |
||||
188 | continue; |
||||
189 | } |
||||
190 | |||||
191 | 13 | // Attempt to find and reserve a job |
|||
192 | 13 | $job = false; |
|||
193 | 12 | if (!$this->paused) { |
|||
194 | if ($blocking === true) { |
||||
195 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Starting blocking with timeout of {interval}', array('interval' => $interval)); |
||||
196 | $this->updateProcLine('Waiting with blocking timeout ' . $interval); |
||||
197 | 12 | } else { |
|||
198 | $this->updateProcLine('Waiting with interval ' . $interval); |
||||
199 | } |
||||
200 | 12 | ||||
201 | $job = $this->reserve($blocking, $interval); |
||||
202 | } |
||||
203 | 13 | ||||
204 | if (!$job) { |
||||
205 | 13 | // For an interval of 0, break now - helps with unit testing etc |
|||
206 | 13 | if ($interval == 0) { |
|||
207 | break; |
||||
208 | } |
||||
209 | |||||
210 | if ($blocking === false) { |
||||
211 | // If no job was found, we sleep for $interval before continuing and checking again |
||||
212 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Sleeping for {interval}', array('interval' => $interval)); |
||||
213 | if ($this->paused) { |
||||
214 | $this->updateProcLine('Paused'); |
||||
215 | } else { |
||||
216 | $this->updateProcLine('Waiting'); |
||||
217 | } |
||||
218 | |||||
219 | usleep($interval * 1000000); |
||||
220 | } |
||||
221 | |||||
222 | continue; |
||||
223 | } |
||||
224 | |||||
225 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Starting work on {job}', array('job' => $job)); |
||||
226 | 9 | Resque_Event::trigger('beforeFork', $job); |
|||
227 | 9 | $this->workingOn($job); |
|||
0 ignored issues
–
show
It seems like
$job can also be of type true ; however, parameter $job of Resque_Worker::workingOn() does only seem to accept Resque_Job , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
228 | 9 | ||||
229 | $this->child = Resque::fork(); |
||||
230 | 9 | ||||
231 | // Forked and we're the child. Or PCNTL is not installed. Run the job. |
||||
232 | if ($this->child === 0 || $this->child === false || $this->child === -1) { |
||||
233 | 9 | $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); |
|||
234 | $this->updateProcLine($status); |
||||
235 | $this->logger->log(Psr\Log\LogLevel::INFO, $status); |
||||
236 | |||||
237 | if (!empty($job->payload['id'])) { |
||||
238 | Resque_Job_PID::create($job->payload['id']); |
||||
239 | } |
||||
240 | |||||
241 | $this->perform($job); |
||||
0 ignored issues
–
show
It seems like
$job can also be of type true ; however, parameter $job of Resque_Worker::perform() does only seem to accept Resque_Job , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
242 | |||||
243 | if (!empty($job->payload['id'])) { |
||||
244 | Resque_Job_PID::del($job->payload['id']); |
||||
245 | } |
||||
246 | |||||
247 | if ($this->child === 0) { |
||||
248 | exit(0); |
||||
0 ignored issues
–
show
|
|||||
249 | } |
||||
250 | } |
||||
251 | |||||
252 | if ($this->child > 0) { |
||||
253 | 9 | // Parent process, sit and wait |
|||
254 | $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); |
||||
255 | 9 | $this->updateProcLine($status); |
|||
256 | 9 | $this->logger->log(Psr\Log\LogLevel::INFO, $status); |
|||
257 | 9 | ||||
258 | // Wait until the child process finishes before continuing |
||||
259 | while (pcntl_wait($status, WNOHANG) === 0) { |
||||
260 | 9 | if (function_exists('pcntl_signal_dispatch')) { |
|||
261 | 9 | pcntl_signal_dispatch(); |
|||
262 | 9 | } |
|||
263 | |||||
264 | // Pause for a half a second to conserve system resources |
||||
265 | usleep(500000); |
||||
266 | 9 | } |
|||
267 | |||||
268 | if (pcntl_wifexited($status) !== true) { |
||||
269 | 9 | $job->fail(new Resque_Job_DirtyExitException('Job exited abnormally')); |
|||
270 | } elseif (($exitStatus = pcntl_wexitstatus($status)) !== 0) { |
||||
271 | 9 | $job->fail(new Resque_Job_DirtyExitException( |
|||
272 | 'Job exited with exit code ' . $exitStatus |
||||
273 | )); |
||||
274 | } else { |
||||
275 | if (in_array($job->getStatus(), array(Resque_Job_Status::STATUS_WAITING, Resque_Job_Status::STATUS_RUNNING))) { |
||||
276 | $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE); |
||||
277 | $this->logger->log(Psr\Log\LogLevel::INFO, 'done ' . $job); |
||||
0 ignored issues
–
show
Are you sure
$job of type object|true can be used in concatenation ?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
278 | 9 | } |
|||
279 | } |
||||
280 | 1 | } |
|||
281 | 1 | ||||
282 | $this->child = null; |
||||
283 | $this->doneWorking(); |
||||
284 | } |
||||
285 | |||||
286 | 9 | $this->unregisterWorker(); |
|||
287 | 9 | } |
|||
288 | |||||
289 | /** |
||||
290 | 13 | * Process a single job. |
|||
291 | 13 | * |
|||
292 | * @param Resque_Job $job The job to be processed. |
||||
293 | */ |
||||
294 | public function perform(Resque_Job $job) |
||||
295 | { |
||||
296 | $result = null; |
||||
0 ignored issues
–
show
|
|||||
297 | try { |
||||
298 | 5 | Resque_Event::trigger('afterFork', $job); |
|||
299 | $result = $job->perform(); |
||||
300 | 5 | } catch (Exception $e) { |
|||
301 | $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); |
||||
302 | 5 | $job->fail($e); |
|||
303 | 5 | return; |
|||
304 | } catch (Error $e) { |
||||
305 | 1 | $this->logger->log(Psr\Log\LogLevel::CRITICAL, '{job} has failed {exception}', array('job' => $job, 'exception' => $e)); |
|||
306 | 1 | $job->fail($e); |
|||
307 | 1 | return; |
|||
308 | 1 | } |
|||
309 | |||||
310 | $job->updateStatus(Resque_Job_Status::STATUS_COMPLETE, $result); |
||||
311 | $this->logger->log(Psr\Log\LogLevel::NOTICE, '{job} has finished', array('job' => $job)); |
||||
312 | } |
||||
313 | |||||
314 | /** |
||||
315 | * @param bool $blocking |
||||
316 | 4 | * @param int $timeout |
|||
317 | 4 | * @return object|boolean Instance of Resque_Job if a job is found, false if not. |
|||
318 | 4 | */ |
|||
319 | public function reserve($blocking = false, $timeout = null) |
||||
320 | { |
||||
321 | if ($this->hasParent && !posix_kill(posix_getppid(), 0)) { |
||||
322 | $this->shutdown(); |
||||
323 | return false; |
||||
324 | } |
||||
325 | 22 | ||||
326 | $queues = $this->queues(); |
||||
327 | 22 | if (!is_array($queues)) { |
|||
0 ignored issues
–
show
|
|||||
328 | return; |
||||
329 | } |
||||
330 | |||||
331 | if ($blocking === true) { |
||||
332 | if (empty($queues)) { |
||||
333 | 22 | $this->logger->log(Psr\Log\LogLevel::INFO, 'No queue was found, sleeping for {interval}', array('interval' => $timeout)); |
|||
334 | 22 | usleep($timeout * 1000000); |
|||
335 | return false; |
||||
336 | } |
||||
337 | $job = Resque_Job::reserveBlocking($queues, $timeout); |
||||
338 | 22 | if ($job) { |
|||
0 ignored issues
–
show
|
|||||
339 | 1 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); |
|||
340 | return $job; |
||||
341 | } |
||||
342 | } else { |
||||
343 | foreach ($queues as $queue) { |
||||
344 | 1 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Checking {queue} for jobs', array('queue' => $queue)); |
|||
345 | 1 | $job = Resque_Job::reserve($queue); |
|||
346 | 1 | if ($job) { |
|||
347 | 1 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Found job on {queue}', array('queue' => $job->queue)); |
|||
348 | return $job; |
||||
349 | } |
||||
350 | 21 | } |
|||
351 | 21 | } |
|||
352 | 21 | ||||
353 | 21 | return false; |
|||
354 | 17 | } |
|||
355 | 17 | ||||
356 | /** |
||||
357 | * Return an array containing all of the queues that this worker should use |
||||
358 | * when searching for jobs. |
||||
359 | * |
||||
360 | 13 | * If * is found in the list of queues, every queue will be searched in |
|||
361 | * alphabetic order. (@see $fetch) |
||||
362 | * |
||||
363 | * @param boolean $fetch If true, and the queue is set to *, will fetch |
||||
364 | * all queue names from redis. |
||||
365 | * @return array Array of associated queues. |
||||
366 | */ |
||||
367 | public function queues($fetch = true) |
||||
368 | { |
||||
369 | if (!in_array('*', $this->queues) || $fetch == false) { |
||||
0 ignored issues
–
show
|
|||||
370 | return $this->queues; |
||||
371 | } |
||||
372 | |||||
373 | $queues = Resque::queues(); |
||||
374 | 22 | sort($queues); |
|||
375 | return $queues; |
||||
376 | 22 | } |
|||
377 | 20 | ||||
378 | /** |
||||
379 | * Perform necessary actions to start a worker. |
||||
380 | 2 | */ |
|||
381 | 2 | private function startup() |
|||
382 | 2 | { |
|||
383 | $this->registerSigHandlers(); |
||||
384 | $this->pruneDeadWorkers(); |
||||
385 | Resque_Event::trigger('beforeFirstFork', $this); |
||||
386 | $this->registerWorker(); |
||||
387 | } |
||||
388 | 13 | ||||
389 | /** |
||||
390 | 13 | * On supported systems (with the PECL proctitle module installed), update |
|||
391 | 13 | * the name of the currently running process to indicate the current state |
|||
392 | 13 | * of a worker. |
|||
393 | 13 | * |
|||
394 | 13 | * @param string $status The updated process title. |
|||
395 | */ |
||||
396 | private function updateProcLine($status) |
||||
397 | { |
||||
398 | $processTitle = static::$processPrefix . '-' . Resque::VERSION . ' (' . implode(',', $this->queues) . '): ' . $status; |
||||
0 ignored issues
–
show
|
|||||
399 | if (function_exists('cli_set_process_title') && PHP_OS !== 'Darwin') { |
||||
400 | cli_set_process_title($processTitle); |
||||
401 | } elseif (function_exists('setproctitle')) { |
||||
402 | setproctitle($processTitle); |
||||
403 | 13 | } |
|||
404 | } |
||||
405 | 13 | ||||
406 | 13 | /** |
|||
407 | 13 | * Register signal handlers that a worker should respond to. |
|||
408 | * |
||||
409 | * TERM: Shutdown immediately and stop processing jobs. |
||||
410 | * INT: Shutdown immediately and stop processing jobs. |
||||
411 | * QUIT: Shutdown after the current job finishes processing. |
||||
412 | 13 | * USR1: Kill the forked child immediately and continue processing jobs. |
|||
413 | */ |
||||
414 | private function registerSigHandlers() |
||||
415 | { |
||||
416 | if (!function_exists('pcntl_signal')) { |
||||
417 | return; |
||||
418 | } |
||||
419 | |||||
420 | pcntl_signal(SIGTERM, array($this, 'shutDownNow')); |
||||
421 | pcntl_signal(SIGINT, array($this, 'shutDownNow')); |
||||
422 | 13 | pcntl_signal(SIGQUIT, array($this, 'shutdown')); |
|||
423 | pcntl_signal(SIGUSR1, array($this, 'killChild')); |
||||
424 | 13 | pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); |
|||
425 | pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); |
||||
426 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Registered signals'); |
||||
427 | } |
||||
428 | 13 | ||||
429 | 13 | /** |
|||
430 | 13 | * Signal handler callback for USR2, pauses processing of new jobs. |
|||
431 | 13 | */ |
|||
432 | 13 | public function pauseProcessing() |
|||
433 | 13 | { |
|||
434 | 13 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'USR2 received; pausing job processing'); |
|||
435 | 13 | $this->paused = true; |
|||
436 | } |
||||
437 | |||||
438 | /** |
||||
439 | * Signal handler callback for CONT, resumes worker allowing it to pick |
||||
440 | 2 | * up new jobs. |
|||
441 | */ |
||||
442 | 2 | public function unPauseProcessing() |
|||
443 | 2 | { |
|||
444 | 2 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'CONT received; resuming job processing'); |
|||
445 | $this->paused = false; |
||||
446 | } |
||||
447 | |||||
448 | /** |
||||
449 | * Schedule a worker for shutdown. Will finish processing the current job |
||||
450 | 1 | * and when the timeout interval is reached, the worker will shut down. |
|||
451 | */ |
||||
452 | 1 | public function shutdown() |
|||
453 | 1 | { |
|||
454 | 1 | $this->shutdown = true; |
|||
455 | $this->logger->log(Psr\Log\LogLevel::NOTICE, 'Shutting down'); |
||||
456 | } |
||||
457 | |||||
458 | /** |
||||
459 | * Force an immediate shutdown of the worker, killing any child jobs |
||||
460 | * currently running. |
||||
461 | */ |
||||
462 | public function shutdownNow() |
||||
463 | { |
||||
464 | $this->shutdown(); |
||||
465 | $this->killChild(); |
||||
466 | } |
||||
467 | |||||
468 | /** |
||||
469 | * @return int Child process PID. |
||||
470 | */ |
||||
471 | public function getChildPID() |
||||
472 | { |
||||
473 | return $this->child; |
||||
474 | } |
||||
475 | |||||
476 | /** |
||||
477 | * Kill a forked child job immediately. The job it is processing will not |
||||
478 | * be completed. |
||||
479 | */ |
||||
480 | public function killChild() |
||||
481 | { |
||||
482 | if (!$this->child) { |
||||
483 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'No child to kill.'); |
||||
484 | return; |
||||
485 | } |
||||
486 | |||||
487 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Killing child at {child}', array('child' => $this->child)); |
||||
488 | if (exec('ps -o pid,s -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { |
||||
489 | $this->logger->log(Psr\Log\LogLevel::DEBUG, 'Child {child} found, killing.', array('child' => $this->child)); |
||||
490 | posix_kill($this->child, SIGKILL); |
||||
491 | $this->child = null; |
||||
492 | } else { |
||||
493 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Child {child} not found, restarting.', array('child' => $this->child)); |
||||
494 | $this->shutdown(); |
||||
495 | } |
||||
496 | } |
||||
497 | |||||
498 | /** |
||||
499 | * Look for any workers which should be running on this server and if |
||||
500 | * they're not, remove them from Redis. |
||||
501 | * |
||||
502 | * This is a form of garbage collection to handle cases where the |
||||
503 | * server may have been killed and the Resque workers did not die gracefully |
||||
504 | * and therefore leave state information in Redis. |
||||
505 | */ |
||||
506 | public function pruneDeadWorkers() |
||||
507 | { |
||||
508 | $workerPids = $this->workerPids(); |
||||
509 | $workers = self::all(); |
||||
510 | foreach ($workers as $worker) { |
||||
511 | if (is_object($worker)) { |
||||
512 | list($host, $pid, $queues) = explode(':', (string)$worker, 3); |
||||
513 | if ($host != $this->hostname || in_array($pid, $workerPids) || $pid == getmypid()) { |
||||
514 | 15 | continue; |
|||
515 | } |
||||
516 | 15 | $this->logger->log(Psr\Log\LogLevel::INFO, 'Pruning dead worker: {worker}', array('worker' => (string)$worker)); |
|||
517 | 15 | $worker->unregisterWorker(); |
|||
518 | 15 | } |
|||
519 | 7 | } |
|||
520 | 7 | } |
|||
521 | 7 | ||||
522 | 7 | /** |
|||
523 | * Return an array of process IDs for all of the Resque workers currently |
||||
524 | 2 | * running on this machine. |
|||
525 | 2 | * |
|||
526 | * @return array Array of Resque worker process IDs. |
||||
527 | */ |
||||
528 | 15 | public function workerPids() |
|||
529 | { |
||||
530 | $pids = array(); |
||||
531 | if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { |
||||
532 | exec('WMIC path win32_process get Processid,Commandline | findstr resque | findstr /V findstr', $cmdOutput); |
||||
533 | foreach ($cmdOutput as $line) { |
||||
534 | $line = preg_replace('/\s+/m', ' ', $line); |
||||
535 | list(,,$pids[]) = explode(' ', trim($line), 3); |
||||
536 | 15 | } |
|||
537 | } else { |
||||
538 | 15 | exec('ps -A -o pid,args | grep [r]esque', $cmdOutput); |
|||
539 | 15 | foreach ($cmdOutput as $line) { |
|||
540 | list($pids[],) = explode(' ', trim($line), 2); |
||||
541 | } |
||||
542 | } |
||||
543 | return $pids; |
||||
544 | } |
||||
545 | |||||
546 | /** |
||||
547 | 15 | * Register this worker in Redis. |
|||
548 | 15 | */ |
|||
549 | 15 | public function registerWorker() |
|||
550 | { |
||||
551 | Resque::redis()->sadd('workers', (string)$this); |
||||
0 ignored issues
–
show
The method
sadd() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
552 | 15 | Resque::redis()->set('worker:' . (string)$this . ':started', date('c')); |
|||
0 ignored issues
–
show
The method
set() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
553 | } |
||||
554 | |||||
555 | /** |
||||
556 | * Unregister this worker in Redis. (shutdown etc) |
||||
557 | */ |
||||
558 | 60 | public function unregisterWorker() |
|||
559 | { |
||||
560 | 60 | if (is_object($this->currentJob)) { |
|||
561 | 60 | $this->currentJob->fail(new Resque_Job_DirtyExitException()); |
|||
562 | 60 | } |
|||
563 | |||||
564 | $id = (string)$this; |
||||
565 | Resque::redis()->srem('workers', $id); |
||||
0 ignored issues
–
show
The method
srem() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
566 | Resque::redis()->del('worker:' . $id); |
||||
0 ignored issues
–
show
The method
del() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
567 | 17 | Resque::redis()->del('worker:' . $id . ':started'); |
|||
568 | Resque_Stat::clear('processed:' . $id); |
||||
569 | 17 | Resque_Stat::clear('failed:' . $id); |
|||
570 | 1 | } |
|||
571 | |||||
572 | /** |
||||
573 | 17 | * Tell Redis which job we're currently working on. |
|||
574 | 17 | * |
|||
575 | 17 | * @param object $job Resque_Job instance containing the job we're working on. |
|||
576 | 17 | */ |
|||
577 | 17 | public function workingOn(Resque_Job $job) |
|||
578 | 17 | { |
|||
579 | 17 | $job->worker = $this; |
|||
580 | $this->currentJob = $job; |
||||
581 | $job->updateStatus(Resque_Job_Status::STATUS_RUNNING); |
||||
582 | $data = json_encode(array( |
||||
583 | 'queue' => $job->queue, |
||||
584 | 'run_at' => date('c'), |
||||
585 | 'payload' => $job->payload |
||||
586 | 14 | )); |
|||
587 | Resque::redis()->set('worker:' . $job->worker, $data); |
||||
588 | 14 | } |
|||
589 | 14 | ||||
590 | 14 | /** |
|||
591 | 14 | * Notify Redis that we've finished working on a job, clearing the working |
|||
592 | 14 | * state and incrementing the job stats. |
|||
593 | 14 | */ |
|||
594 | 14 | public function doneWorking() |
|||
595 | { |
||||
596 | 14 | $this->currentJob = null; |
|||
597 | 14 | Resque_Stat::incr('processed'); |
|||
598 | Resque_Stat::incr('processed:' . (string)$this); |
||||
599 | Resque::redis()->del('worker:' . (string)$this); |
||||
600 | } |
||||
601 | |||||
602 | /** |
||||
603 | 10 | * Generate a string representation of this worker. |
|||
604 | * |
||||
605 | 10 | * @return string String identifier for this worker instance. |
|||
606 | 10 | */ |
|||
607 | 10 | public function __toString() |
|||
608 | 10 | { |
|||
609 | 10 | return $this->id; |
|||
610 | } |
||||
611 | |||||
612 | /** |
||||
613 | * Return an object describing the job this worker is currently working on. |
||||
614 | * |
||||
615 | * @return object Object with details of current job. |
||||
616 | 63 | */ |
|||
617 | public function job() |
||||
618 | 63 | { |
|||
619 | $job = Resque::redis()->get('worker:' . $this); |
||||
0 ignored issues
–
show
The method
get() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
620 | if (!$job) { |
||||
621 | return array(); |
||||
0 ignored issues
–
show
|
|||||
622 | } else { |
||||
623 | return json_decode($job, true); |
||||
624 | } |
||||
625 | } |
||||
626 | 2 | ||||
627 | /** |
||||
628 | 2 | * Get a statistic belonging to this worker. |
|||
629 | 2 | * |
|||
630 | 1 | * @param string $stat Statistic to fetch. |
|||
631 | * @return int Statistic value. |
||||
632 | */ |
||||
633 | 1 | public function getStat($stat) |
|||
634 | { |
||||
635 | return Resque_Stat::get($stat . ':' . $this); |
||||
636 | } |
||||
637 | |||||
638 | /** |
||||
639 | * Inject the logging object into the worker |
||||
640 | * |
||||
641 | * @param Psr\Log\LoggerInterface $logger |
||||
642 | */ |
||||
643 | 1 | public function setLogger(Psr\Log\LoggerInterface $logger) |
|||
644 | { |
||||
645 | 1 | $this->logger = $logger; |
|||
646 | } |
||||
647 | } |
||||
648 |