resque /
php-resque
| 1 | <?php |
||
| 2 | /** |
||
| 3 | * ResqueScheduler worker to handle scheduling of delayed tasks. |
||
| 4 | * |
||
| 5 | * @package ResqueScheduler |
||
| 6 | * @author Chris Boulton <[email protected]> |
||
| 7 | * @copyright (c) 2012 Chris Boulton |
||
| 8 | * @license http://www.opensource.org/licenses/mit-license.php |
||
| 9 | */ |
||
| 10 | class ResqueScheduler_Worker |
||
| 11 | { |
||
| 12 | const LOG_NONE = 0; |
||
| 13 | const LOG_NORMAL = 1; |
||
| 14 | const LOG_VERBOSE = 2; |
||
| 15 | |||
| 16 | /** |
||
| 17 | * @var int Current log level of this worker. |
||
| 18 | */ |
||
| 19 | public $logLevel = 0; |
||
| 20 | |||
| 21 | /** |
||
| 22 | * @var int Interval to sleep for between checking schedules. |
||
| 23 | */ |
||
| 24 | protected $interval = 5; |
||
| 25 | |||
| 26 | /** |
||
| 27 | * The primary loop for a worker. |
||
| 28 | * |
||
| 29 | * Every $interval (seconds), the scheduled queue will be checked for jobs |
||
| 30 | * that should be pushed to Resque. |
||
| 31 | * |
||
| 32 | * @param int $interval How often to check schedules. |
||
| 33 | */ |
||
| 34 | public function work($interval = null) |
||
| 35 | { |
||
| 36 | if ($interval !== null) { |
||
| 37 | $this->interval = $interval; |
||
| 38 | } |
||
| 39 | |||
| 40 | $this->updateProcLine('Starting'); |
||
| 41 | |||
| 42 | while (true) { |
||
| 43 | $this->handleDelayedItems(); |
||
| 44 | $this->sleep(); |
||
| 45 | } |
||
| 46 | } |
||
| 47 | |||
| 48 | /** |
||
| 49 | * Handle delayed items for the next scheduled timestamp. |
||
| 50 | * |
||
| 51 | * Searches for any items that are due to be scheduled in Resque |
||
| 52 | * and adds them to the appropriate job queue in Resque. |
||
| 53 | * |
||
| 54 | * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule. |
||
| 55 | */ |
||
| 56 | public function handleDelayedItems($timestamp = null) |
||
| 57 | { |
||
| 58 | while (($oldestJobTimestamp = ResqueScheduler::nextDelayedTimestamp($timestamp)) !== false) { |
||
| 59 | $this->updateProcLine('Processing Delayed Items'); |
||
| 60 | $this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp); |
||
| 61 | } |
||
| 62 | } |
||
| 63 | |||
| 64 | /** |
||
| 65 | * Schedule all of the delayed jobs for a given timestamp. |
||
| 66 | * |
||
| 67 | * Searches for all items for a given timestamp, pulls them off the list of |
||
| 68 | * delayed jobs and pushes them across to Resque. |
||
| 69 | * |
||
| 70 | * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule. |
||
| 71 | */ |
||
| 72 | public function enqueueDelayedItemsForTimestamp($timestamp) |
||
| 73 | { |
||
| 74 | $item = null; |
||
|
0 ignored issues
–
show
Unused Code
introduced
by
Loading history...
|
|||
| 75 | while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) { |
||
| 76 | $this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] .' [delayed]'); |
||
| 77 | |||
| 78 | Resque_Event::trigger('beforeDelayedEnqueue', array( |
||
| 79 | 'queue' => $item['queue'], |
||
| 80 | 'class' => $item['class'], |
||
| 81 | 'args' => $item['args'], |
||
| 82 | )); |
||
| 83 | |||
| 84 | $payload = array_merge(array($item['queue'], $item['class']), $item['args']); |
||
| 85 | call_user_func_array('Resque::enqueue', $payload); |
||
| 86 | } |
||
| 87 | } |
||
| 88 | |||
| 89 | /** |
||
| 90 | * Sleep for the defined interval. |
||
| 91 | */ |
||
| 92 | protected function sleep() |
||
| 93 | { |
||
| 94 | sleep($this->interval); |
||
| 95 | } |
||
| 96 | |||
| 97 | /** |
||
| 98 | * Update the status of the current worker process. |
||
| 99 | * |
||
| 100 | * On supported systems (with the PECL proctitle module installed), update |
||
| 101 | * the name of the currently running process to indicate the current state |
||
| 102 | * of a worker. |
||
| 103 | * |
||
| 104 | * @param string $status The updated process title. |
||
| 105 | */ |
||
| 106 | private function updateProcLine($status) |
||
| 107 | { |
||
| 108 | if(function_exists('setproctitle')) { |
||
| 109 | setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status); |
||
| 110 | } |
||
| 111 | } |
||
| 112 | |||
| 113 | /** |
||
| 114 | * Output a given log message to STDOUT. |
||
| 115 | * |
||
| 116 | * @param string $message Message to output. |
||
| 117 | */ |
||
| 118 | public function log($message) |
||
| 119 | { |
||
| 120 | if($this->logLevel == self::LOG_NORMAL) { |
||
| 121 | fwrite(STDOUT, "*** " . $message . "\n"); |
||
| 122 | } |
||
| 123 | else if($this->logLevel == self::LOG_VERBOSE) { |
||
| 124 | fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n"); |
||
| 125 | } |
||
| 126 | } |
||
| 127 | } |
||
| 128 |