1 | <?php |
||
2 | |||
3 | declare(ticks=1); |
||
4 | |||
5 | /** |
||
6 | * ResqueScheduler worker to handle scheduling of delayed tasks. |
||
7 | * |
||
8 | * @package ResqueScheduler |
||
9 | * @author Chris Boulton <[email protected]> |
||
10 | * @copyright (c) 2012 Chris Boulton |
||
11 | * @license http://www.opensource.org/licenses/mit-license.php |
||
12 | */ |
||
13 | class ResqueScheduler_Worker |
||
14 | { |
||
15 | const LOG_NONE = 0; |
||
16 | const LOG_NORMAL = 1; |
||
17 | const LOG_VERBOSE = 2; |
||
18 | |||
19 | /** |
||
20 | * @var int Current log level of this worker. |
||
21 | */ |
||
22 | public $logLevel = 0; |
||
23 | |||
24 | /** |
||
25 | * @var int Interval to sleep for between checking schedules. |
||
26 | */ |
||
27 | protected $interval = 5; |
||
28 | |||
29 | /** |
||
30 | * @var boolean True if on the next iteration, the worker should shutdown. |
||
31 | */ |
||
32 | private $shutdown = false; |
||
33 | |||
34 | /** |
||
35 | * @var boolean True if this worker is paused. |
||
36 | */ |
||
37 | private $paused = false; |
||
38 | |||
39 | /** |
||
40 | * The primary loop for a worker. |
||
41 | * |
||
42 | * Every $interval (seconds), the scheduled queue will be checked for jobs |
||
43 | * that should be pushed to Resque. |
||
44 | * |
||
45 | * @param int $interval How often to check schedules. |
||
46 | */ |
||
47 | public function work($interval = null) |
||
48 | { |
||
49 | if ($interval !== null) { |
||
50 | $this->interval = $interval; |
||
51 | } |
||
52 | |||
53 | $this->updateProcLine('Starting'); |
||
54 | $this->registerSigHandlers(); |
||
55 | |||
56 | while (true) { |
||
57 | if ($this->shutdown) { |
||
58 | break; |
||
59 | } |
||
60 | if (!$this->paused) { |
||
61 | $this->handleDelayedItems(); |
||
62 | } |
||
63 | $this->sleep(); |
||
64 | } |
||
65 | } |
||
66 | |||
67 | /** |
||
68 | * Handle delayed items for the next scheduled timestamp. |
||
69 | * |
||
70 | * Searches for any items that are due to be scheduled in Resque |
||
71 | * and adds them to the appropriate job queue in Resque. |
||
72 | * |
||
73 | * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule. |
||
74 | */ |
||
75 | public function handleDelayedItems($timestamp = null) |
||
76 | { |
||
77 | while (($oldestJobTimestamp = ResqueScheduler::nextDelayedTimestamp($timestamp)) !== false) { |
||
78 | $this->updateProcLine('Processing Delayed Items'); |
||
79 | $this->enqueueDelayedItemsForTimestamp($oldestJobTimestamp); |
||
80 | } |
||
81 | } |
||
82 | |||
83 | /** |
||
84 | * Schedule all of the delayed jobs for a given timestamp. |
||
85 | * |
||
86 | * Searches for all items for a given timestamp, pulls them off the list of |
||
87 | * delayed jobs and pushes them across to Resque. |
||
88 | * |
||
89 | * @param DateTime|int $timestamp Search for any items up to this timestamp to schedule. |
||
90 | */ |
||
91 | public function enqueueDelayedItemsForTimestamp($timestamp) |
||
92 | { |
||
93 | $item = null; |
||
0 ignored issues
–
show
Unused Code
introduced
by
![]() |
|||
94 | while ($item = ResqueScheduler::nextItemForTimestamp($timestamp)) { |
||
95 | $this->log('queueing ' . $item['class'] . ' in ' . $item['queue'] . ' [delayed]'); |
||
96 | |||
97 | Resque_Event::trigger('beforeDelayedEnqueue', array( |
||
98 | 'queue' => $item['queue'], |
||
99 | 'class' => $item['class'], |
||
100 | 'args' => $item['args'], |
||
101 | )); |
||
102 | |||
103 | $payload = array_merge(array($item['queue'], $item['class']), $item['args']); |
||
104 | call_user_func_array('Resque::enqueue', $payload); |
||
105 | } |
||
106 | } |
||
107 | |||
108 | /** |
||
109 | * Sleep for the defined interval. |
||
110 | */ |
||
111 | protected function sleep() |
||
112 | { |
||
113 | sleep($this->interval); |
||
114 | } |
||
115 | |||
116 | /** |
||
117 | * Update the status of the current worker process. |
||
118 | * |
||
119 | * On supported systems (with the PECL proctitle module installed), update |
||
120 | * the name of the currently running process to indicate the current state |
||
121 | * of a worker. |
||
122 | * |
||
123 | * @param string $status The updated process title. |
||
124 | */ |
||
125 | private function updateProcLine($status) |
||
126 | { |
||
127 | if (function_exists('setproctitle')) { |
||
128 | setproctitle('resque-scheduler-' . ResqueScheduler::VERSION . ': ' . $status); |
||
129 | } |
||
130 | } |
||
131 | |||
132 | /** |
||
133 | * Output a given log message to STDOUT. |
||
134 | * |
||
135 | * @param string $message Message to output. |
||
136 | */ |
||
137 | public function log($message) |
||
138 | { |
||
139 | if ($this->logLevel == self::LOG_NORMAL) { |
||
140 | fwrite(STDOUT, "*** " . $message . "\n"); |
||
141 | } elseif ($this->logLevel == self::LOG_VERBOSE) { |
||
142 | fwrite(STDOUT, "** [" . strftime('%T %Y-%m-%d') . "] " . $message . "\n"); |
||
143 | } |
||
144 | } |
||
145 | |||
146 | /** |
||
147 | * Register signal handlers that a worker should respond to. |
||
148 | * |
||
149 | * TERM: Shutdown after the current timestamp was processed. |
||
150 | * INT: Shutdown after the current timestamp was processed. |
||
151 | * QUIT: Shutdown after the current timestamp was processed. |
||
152 | */ |
||
153 | private function registerSigHandlers() |
||
154 | { |
||
155 | if (!function_exists('pcntl_signal')) { |
||
156 | return; |
||
157 | } |
||
158 | |||
159 | pcntl_signal(SIGTERM, array($this, 'shutdown')); |
||
160 | pcntl_signal(SIGINT, array($this, 'shutdown')); |
||
161 | pcntl_signal(SIGQUIT, array($this, 'shutdown')); |
||
162 | pcntl_signal(SIGUSR2, array($this, 'pauseProcessing')); |
||
163 | pcntl_signal(SIGCONT, array($this, 'unPauseProcessing')); |
||
164 | |||
165 | $this->log('Registered signals'); |
||
166 | } |
||
167 | |||
168 | public function shutdown() |
||
169 | { |
||
170 | $this->log('Shutting down'); |
||
171 | $this->shutdown = true; |
||
172 | } |
||
173 | |||
174 | /** |
||
175 | * Signal handler callback for USR2, pauses processing. |
||
176 | */ |
||
177 | public function pauseProcessing() |
||
178 | { |
||
179 | $this->log('USR2 received; pausing processing'); |
||
180 | $this->paused = true; |
||
181 | } |
||
182 | |||
183 | /** |
||
184 | * Signal handler callback for CONT, resume processing. |
||
185 | */ |
||
186 | public function unPauseProcessing() |
||
187 | { |
||
188 | $this->log('CONT received; resuming processing'); |
||
189 | $this->paused = false; |
||
190 | } |
||
191 | } |
||
192 |