1 | <?php |
||||
2 | |||||
3 | /** |
||||
4 | * Base Resque class. |
||||
5 | * |
||||
6 | * @package Resque |
||||
7 | * @author Chris Boulton <[email protected]> |
||||
8 | * @license http://www.opensource.org/licenses/mit-license.php |
||||
9 | */ |
||||
10 | class Resque |
||||
11 | { |
||||
12 | const VERSION = '1.2'; |
||||
13 | |||||
14 | const DEFAULT_INTERVAL = 5; |
||||
15 | |||||
16 | /** |
||||
17 | * @var Resque_Redis Instance of Resque_Redis that talks to redis. |
||||
18 | */ |
||||
19 | public static $redis = null; |
||||
20 | |||||
21 | /** |
||||
22 | * @var mixed Host/port conbination separated by a colon, or a nested |
||||
23 | * array of server swith host/port pairs |
||||
24 | */ |
||||
25 | protected static $redisServer = null; |
||||
26 | |||||
27 | /** |
||||
28 | * @var int ID of Redis database to select. |
||||
29 | */ |
||||
30 | protected static $redisDatabase = 0; |
||||
31 | |||||
32 | /** |
||||
33 | * @var string auth of Redis database |
||||
34 | */ |
||||
35 | protected static $auth; |
||||
36 | |||||
37 | /** |
||||
38 | * Given a host/port combination separated by a colon, set it as |
||||
39 | * the redis server that Resque will talk to. |
||||
40 | * |
||||
41 | * @param mixed $server Host/port combination separated by a colon, DSN-formatted URI, or |
||||
42 | * a callable that receives the configured database ID |
||||
43 | * and returns a Resque_Redis instance, or |
||||
44 | * a nested array of servers with host/port pairs. |
||||
45 | * @param int $database |
||||
46 | * @param string $auth |
||||
47 | */ |
||||
48 | public static function setBackend($server, $database = 0, $auth = null) |
||||
49 | { |
||||
50 | self::$redisServer = $server; |
||||
51 | self::$redisDatabase = $database; |
||||
52 | self::$auth = $auth; |
||||
53 | self::$redis = null; |
||||
54 | } |
||||
55 | |||||
56 | /** |
||||
57 | * Return an instance of the Resque_Redis class instantiated for Resque. |
||||
58 | * |
||||
59 | * @return Resque_Redis Instance of Resque_Redis. |
||||
60 | */ |
||||
61 | public static function redis() |
||||
62 | { |
||||
63 | if (self::$redis !== null) { |
||||
64 | return self::$redis; |
||||
65 | } |
||||
66 | |||||
67 | if (is_callable(self::$redisServer)) { |
||||
68 | self::$redis = call_user_func(self::$redisServer, self::$redisDatabase); |
||||
69 | } else { |
||||
70 | self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase); |
||||
71 | } |
||||
72 | |||||
73 | if (!empty(self::$auth)) { |
||||
74 | self::$redis->auth(self::$auth); |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
75 | } |
||||
76 | |||||
77 | return self::$redis; |
||||
78 | } |
||||
79 | |||||
80 | /** |
||||
81 | * fork() helper method for php-resque that handles issues PHP socket |
||||
82 | * and phpredis have with passing around sockets between child/parent |
||||
83 | * processes. |
||||
84 | * |
||||
85 | * Will close connection to Redis before forking. |
||||
86 | * |
||||
87 | * @return int Return vars as per pcntl_fork(). False if pcntl_fork is unavailable |
||||
88 | */ |
||||
89 | public static function fork() |
||||
90 | { |
||||
91 | if (!function_exists('pcntl_fork')) { |
||||
92 | return false; |
||||
93 | } |
||||
94 | |||||
95 | // Close the connection to Redis before forking. |
||||
96 | // This is a workaround for issues phpredis has. |
||||
97 | self::$redis = null; |
||||
98 | |||||
99 | $pid = pcntl_fork(); |
||||
100 | if ($pid === -1) { |
||||
101 | throw new RuntimeException('Unable to fork child worker.'); |
||||
102 | } |
||||
103 | |||||
104 | return $pid; |
||||
105 | } |
||||
106 | |||||
107 | /** |
||||
108 | * Push a job to the end of a specific queue. If the queue does not |
||||
109 | * exist, then create it as well. |
||||
110 | * |
||||
111 | * @param string $queue The name of the queue to add the job to. |
||||
112 | * @param array $item Job description as an array to be JSON encoded. |
||||
113 | */ |
||||
114 | public static function push($queue, $item) |
||||
115 | { |
||||
116 | $encodedItem = json_encode($item); |
||||
117 | if ($encodedItem === false) { |
||||
118 | return false; |
||||
119 | } |
||||
120 | self::redis()->sadd('queues', $queue); |
||||
121 | $length = self::redis()->rpush('queue:' . $queue, $encodedItem); |
||||
122 | if ($length < 1) { |
||||
123 | return false; |
||||
124 | } |
||||
125 | return true; |
||||
126 | } |
||||
127 | |||||
128 | /** |
||||
129 | * Pop an item off the end of the specified queue, decode it and |
||||
130 | * return it. |
||||
131 | * |
||||
132 | * @param string $queue The name of the queue to fetch an item from. |
||||
133 | * @return array Decoded item from the queue. |
||||
134 | */ |
||||
135 | public static function pop($queue) |
||||
136 | { |
||||
137 | $item = self::redis()->lpop('queue:' . $queue); |
||||
138 | |||||
139 | if (!$item) { |
||||
140 | return; |
||||
141 | } |
||||
142 | |||||
143 | return json_decode($item, true); |
||||
144 | } |
||||
145 | |||||
146 | /** |
||||
147 | * Remove items of the specified queue |
||||
148 | * |
||||
149 | * @param string $queue The name of the queue to fetch an item from. |
||||
150 | * @param array $items |
||||
151 | * @return integer number of deleted items |
||||
152 | */ |
||||
153 | public static function dequeue($queue, $items = array()) |
||||
154 | { |
||||
155 | if (count($items) > 0) { |
||||
156 | return self::removeItems($queue, $items); |
||||
157 | } else { |
||||
158 | return self::removeList($queue); |
||||
159 | } |
||||
160 | } |
||||
161 | |||||
162 | /** |
||||
163 | * Remove specified queue |
||||
164 | * |
||||
165 | * @param string $queue The name of the queue to remove. |
||||
166 | * @return integer Number of deleted items |
||||
167 | */ |
||||
168 | public static function removeQueue($queue) |
||||
169 | { |
||||
170 | $num = self::removeList($queue); |
||||
171 | self::redis()->srem('queues', $queue); |
||||
172 | return $num; |
||||
173 | } |
||||
174 | |||||
175 | /** |
||||
176 | * Pop an item off the end of the specified queues, using blocking list pop, |
||||
177 | * decode it and return it. |
||||
178 | * |
||||
179 | * @param array $queues |
||||
180 | * @param int $timeout |
||||
181 | * @return null|array Decoded item from the queue. |
||||
182 | */ |
||||
183 | public static function blpop(array $queues, $timeout) |
||||
184 | { |
||||
185 | $list = array(); |
||||
186 | foreach ($queues as $queue) { |
||||
187 | $list[] = 'queue:' . $queue; |
||||
188 | } |
||||
189 | |||||
190 | $item = self::redis()->blpop($list, (int)$timeout); |
||||
191 | |||||
192 | if (!$item) { |
||||
193 | return; |
||||
194 | } |
||||
195 | |||||
196 | /** |
||||
197 | * Normally the Resque_Redis class returns queue names without the prefix |
||||
198 | * But the blpop is a bit different. It returns the name as prefix:queue:name |
||||
199 | * So we need to strip off the prefix:queue: part |
||||
200 | */ |
||||
201 | $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:')); |
||||
202 | |||||
203 | return array( |
||||
204 | 'queue' => $queue, |
||||
205 | 'payload' => json_decode($item[1], true) |
||||
206 | ); |
||||
207 | } |
||||
208 | |||||
209 | /** |
||||
210 | * Return the size (number of pending jobs) of the specified queue. |
||||
211 | * |
||||
212 | * @param string $queue name of the queue to be checked for pending jobs |
||||
213 | * |
||||
214 | * @return int The size of the queue. |
||||
215 | */ |
||||
216 | public static function size($queue) |
||||
217 | { |
||||
218 | return self::redis()->llen('queue:' . $queue); |
||||
219 | } |
||||
220 | |||||
221 | /** |
||||
222 | * Create a new job and save it to the specified queue. |
||||
223 | * |
||||
224 | * @param string $queue The name of the queue to place the job in. |
||||
225 | * @param string $class The name of the class that contains the code to execute the job. |
||||
226 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||||
227 | * @param boolean $trackStatus Set to true to be able to monitor the status of a job. |
||||
228 | * @param string $prefix The prefix needs to be set for the status key |
||||
229 | * |
||||
230 | * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue |
||||
231 | */ |
||||
232 | public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "") |
||||
233 | { |
||||
234 | $id = Resque::generateJobId(); |
||||
235 | $hookParams = array( |
||||
236 | 'class' => $class, |
||||
237 | 'args' => $args, |
||||
238 | 'queue' => $queue, |
||||
239 | 'id' => $id, |
||||
240 | ); |
||||
241 | try { |
||||
242 | Resque_Event::trigger('beforeEnqueue', $hookParams); |
||||
243 | } catch (Resque_Job_DontCreate $e) { |
||||
244 | return false; |
||||
245 | } |
||||
246 | |||||
247 | Resque_Job::create($queue, $class, $args, $trackStatus, $id, $prefix); |
||||
248 | Resque_Event::trigger('afterEnqueue', $hookParams); |
||||
249 | |||||
250 | return $id; |
||||
251 | } |
||||
252 | |||||
253 | /** |
||||
254 | * Reserve and return the next available job in the specified queue. |
||||
255 | * |
||||
256 | * @param string $queue Queue to fetch next available job from. |
||||
257 | * @return Resque_Job Instance of Resque_Job to be processed, false if none or error. |
||||
258 | */ |
||||
259 | public static function reserve($queue) |
||||
260 | { |
||||
261 | return Resque_Job::reserve($queue); |
||||
262 | } |
||||
263 | |||||
264 | /** |
||||
265 | * Get an array of all known queues. |
||||
266 | * |
||||
267 | * @return array Array of queues. |
||||
268 | */ |
||||
269 | public static function queues() |
||||
270 | { |
||||
271 | $queues = self::redis()->smembers('queues'); |
||||
272 | if (!is_array($queues)) { |
||||
273 | $queues = array(); |
||||
274 | } |
||||
275 | return $queues; |
||||
276 | } |
||||
277 | |||||
278 | /** |
||||
279 | * Retrieve all the items of a queue with Redis |
||||
280 | * |
||||
281 | * @return array Array of items. |
||||
282 | */ |
||||
283 | public static function items($queue, $start = 0, $stop = -1) |
||||
284 | { |
||||
285 | $list = self::redis()->lrange('queue:' . $queue, $start, $stop); |
||||
0 ignored issues
–
show
The method
lrange() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
286 | if (!is_array($list)) { |
||||
287 | $list = array(); |
||||
288 | } |
||||
289 | return $list; |
||||
290 | } |
||||
291 | |||||
292 | /** |
||||
293 | * Remove Items from the queue |
||||
294 | * Safely moving each item to a temporary queue before processing it |
||||
295 | * If the Job matches, counts otherwise puts it in a requeue_queue |
||||
296 | * which at the end eventually be copied back into the original queue |
||||
297 | * |
||||
298 | * @private |
||||
299 | * |
||||
300 | * @param string $queue The name of the queue |
||||
301 | * @param array $items |
||||
302 | * @return integer number of deleted items |
||||
303 | */ |
||||
304 | private static function removeItems($queue, $items = array()) |
||||
305 | { |
||||
306 | $counter = 0; |
||||
307 | $originalQueue = 'queue:' . $queue; |
||||
308 | $tempQueue = $originalQueue . ':temp:' . time(); |
||||
309 | $requeueQueue = $tempQueue . ':requeue'; |
||||
310 | |||||
311 | // move each item from original queue to temp queue and process it |
||||
312 | $finished = false; |
||||
313 | while (!$finished) { |
||||
314 | $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); |
||||
315 | |||||
316 | if (!empty($string)) { |
||||
317 | if (self::matchItem($string, $items)) { |
||||
318 | self::redis()->rpop($tempQueue); |
||||
319 | $counter++; |
||||
320 | } else { |
||||
321 | self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); |
||||
322 | } |
||||
323 | } else { |
||||
324 | $finished = true; |
||||
325 | } |
||||
326 | } |
||||
327 | |||||
328 | // move back from temp queue to original queue |
||||
329 | $finished = false; |
||||
330 | while (!$finished) { |
||||
331 | $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() . $originalQueue); |
||||
332 | if (empty($string)) { |
||||
333 | $finished = true; |
||||
334 | } |
||||
335 | } |
||||
336 | |||||
337 | // remove temp queue and requeue queue |
||||
338 | self::redis()->del($requeueQueue); |
||||
339 | self::redis()->del($tempQueue); |
||||
340 | |||||
341 | return $counter; |
||||
342 | } |
||||
343 | |||||
344 | /** |
||||
345 | * matching item |
||||
346 | * item can be ['class'] or ['class' => 'id'] or ['class' => {'foo' => 1, 'bar' => 2}] |
||||
347 | * @private |
||||
348 | * |
||||
349 | * @params string $string redis result in json |
||||
350 | * @params $items |
||||
351 | * |
||||
352 | * @return (bool) |
||||
353 | */ |
||||
354 | private static function matchItem($string, $items) |
||||
355 | { |
||||
356 | $decoded = json_decode($string, true); |
||||
357 | |||||
358 | foreach ($items as $key => $val) { |
||||
359 | # class name only ex: item[0] = ['class'] |
||||
360 | if (is_numeric($key)) { |
||||
361 | if ($decoded['class'] == $val) { |
||||
362 | return true; |
||||
363 | } |
||||
364 | # class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}] |
||||
365 | } elseif (is_array($val)) { |
||||
366 | $decodedArgs = (array)$decoded['args'][0]; |
||||
367 | if ( |
||||
368 | $decoded['class'] == $key && |
||||
369 | count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0 |
||||
370 | ) { |
||||
371 | return true; |
||||
372 | } |
||||
373 | # class name with ID, example: item[0] = ['class' => 'id'] |
||||
374 | } else { |
||||
375 | if ($decoded['class'] == $key && $decoded['id'] == $val) { |
||||
376 | return true; |
||||
377 | } |
||||
378 | } |
||||
379 | } |
||||
380 | return false; |
||||
381 | } |
||||
382 | |||||
383 | /** |
||||
384 | * Remove List |
||||
385 | * |
||||
386 | * @private |
||||
387 | * |
||||
388 | * @params string $queue the name of the queue |
||||
389 | * @return integer number of deleted items belongs to this list |
||||
390 | */ |
||||
391 | private static function removeList($queue) |
||||
392 | { |
||||
393 | $counter = self::size($queue); |
||||
394 | $result = self::redis()->del('queue:' . $queue); |
||||
395 | return ($result == 1) ? $counter : 0; |
||||
396 | } |
||||
397 | |||||
398 | /* |
||||
399 | * Generate an identifier to attach to a job for status tracking. |
||||
400 | * |
||||
401 | * @return string |
||||
402 | */ |
||||
403 | public static function generateJobId() |
||||
404 | { |
||||
405 | return md5(uniqid('', true)); |
||||
406 | } |
||||
407 | } |
||||
408 |