1 | <?php |
||
2 | |||
3 | /** |
||
4 | * ResqueScheduler core class to handle scheduling of jobs in the future. |
||
5 | * |
||
6 | * @package ResqueScheduler |
||
7 | * @author Chris Boulton <[email protected]> |
||
8 | * @copyright (c) 2012 Chris Boulton |
||
9 | * @license http://www.opensource.org/licenses/mit-license.php |
||
10 | */ |
||
11 | class ResqueScheduler |
||
12 | { |
||
13 | const VERSION = "0.1"; |
||
14 | |||
15 | /** |
||
16 | * Enqueue a job in a given number of seconds from now. |
||
17 | * |
||
18 | * Identical to Resque::enqueue, however the first argument is the number |
||
19 | * of seconds before the job should be executed. |
||
20 | * |
||
21 | * @param int $in Number of seconds from now when the job should be executed. |
||
22 | * @param string $queue The name of the queue to place the job in. |
||
23 | * @param string $class The name of the class that contains the code to execute the job. |
||
24 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||
25 | */ |
||
26 | public static function enqueueIn($in, $queue, $class, array $args = array()) |
||
27 | { |
||
28 | self::enqueueAt(time() + $in, $queue, $class, $args); |
||
29 | } |
||
30 | |||
31 | /** |
||
32 | * Enqueue a job for execution at a given timestamp. |
||
33 | * |
||
34 | * Identical to Resque::enqueue, however the first argument is a timestamp |
||
35 | * (either UNIX timestamp in integer format or an instance of the DateTime |
||
36 | * class in PHP). |
||
37 | * |
||
38 | * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp. |
||
39 | * @param string $queue The name of the queue to place the job in. |
||
40 | * @param string $class The name of the class that contains the code to execute the job. |
||
41 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||
42 | */ |
||
43 | public static function enqueueAt($at, $queue, $class, $args = array()) |
||
44 | { |
||
45 | self::validateJob($class, $queue); |
||
46 | |||
47 | $job = self::jobToHash($queue, $class, $args); |
||
48 | self::delayedPush($at, $job); |
||
49 | |||
50 | Resque_Event::trigger('afterSchedule', array( |
||
51 | 'at' => $at, |
||
52 | 'queue' => $queue, |
||
53 | 'class' => $class, |
||
54 | 'args' => $args, |
||
55 | )); |
||
56 | } |
||
57 | |||
58 | /** |
||
59 | * Directly append an item to the delayed queue schedule. |
||
60 | * |
||
61 | * @param DateTime|int $timestamp Timestamp job is scheduled to be run at. |
||
62 | * @param array $item Hash of item to be pushed to schedule. |
||
63 | */ |
||
64 | public static function delayedPush($timestamp, $item) |
||
65 | { |
||
66 | $timestamp = self::getTimestamp($timestamp); |
||
67 | $redis = Resque::redis(); |
||
68 | $redis->rpush('delayed:' . $timestamp, json_encode($item)); |
||
69 | |||
70 | $redis->zadd('delayed_queue_schedule', $timestamp, $timestamp); |
||
71 | } |
||
72 | |||
73 | /** |
||
74 | * Get the total number of jobs in the delayed schedule. |
||
75 | * |
||
76 | * @return int Number of scheduled jobs. |
||
77 | */ |
||
78 | public static function getDelayedQueueScheduleSize() |
||
79 | { |
||
80 | return (int)Resque::redis()->zcard('delayed_queue_schedule'); |
||
81 | } |
||
82 | |||
83 | /** |
||
84 | * Get the number of jobs for a given timestamp in the delayed schedule. |
||
85 | * |
||
86 | * @param DateTime|int $timestamp Timestamp |
||
87 | * @return int Number of scheduled jobs. |
||
88 | */ |
||
89 | public static function getDelayedTimestampSize($timestamp) |
||
90 | { |
||
91 | $timestamp = self::getTimestamp($timestamp); |
||
92 | return Resque::redis()->llen('delayed:' . $timestamp, $timestamp); |
||
93 | } |
||
94 | |||
95 | /** |
||
96 | * Remove a delayed job from the queue |
||
97 | * |
||
98 | * note: you must specify exactly the same |
||
99 | * queue, class and arguments that you used when you added |
||
100 | * to the delayed queue |
||
101 | * |
||
102 | * also, this is an expensive operation because all delayed keys have tobe |
||
103 | * searched |
||
104 | * |
||
105 | * @param $queue |
||
106 | * @param $class |
||
107 | * @param $args |
||
108 | * @return int number of jobs that were removed |
||
109 | */ |
||
110 | public static function removeDelayed($queue, $class, $args) |
||
111 | { |
||
112 | $destroyed = 0; |
||
113 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||
114 | $redis = Resque::redis(); |
||
115 | |||
116 | foreach ($redis->keys('delayed:*') as $key) { |
||
117 | $key = $redis->removePrefix($key); |
||
118 | $destroyed += $redis->lrem($key, 0, $item); |
||
119 | } |
||
120 | |||
121 | return $destroyed; |
||
122 | } |
||
123 | |||
124 | /** |
||
125 | * removed a delayed job queued for a specific timestamp |
||
126 | * |
||
127 | * note: you must specify exactly the same |
||
128 | * queue, class and arguments that you used when you added |
||
129 | * to the delayed queue |
||
130 | * |
||
131 | * @param $timestamp |
||
132 | * @param $queue |
||
133 | * @param $class |
||
134 | * @param $args |
||
135 | * @return mixed |
||
136 | */ |
||
137 | public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args) |
||
138 | { |
||
139 | $key = 'delayed:' . self::getTimestamp($timestamp); |
||
140 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||
141 | $redis = Resque::redis(); |
||
142 | $count = $redis->lrem($key, 0, $item); |
||
143 | self::cleanupTimestamp($key, $timestamp); |
||
144 | |||
145 | return $count; |
||
146 | } |
||
147 | |||
148 | /** |
||
149 | * Generate hash of all job properties to be saved in the scheduled queue. |
||
150 | * |
||
151 | * @param string $queue Name of the queue the job will be placed on. |
||
152 | * @param string $class Name of the job class. |
||
153 | * @param array $args Array of job arguments. |
||
154 | */ |
||
155 | |||
156 | private static function jobToHash($queue, $class, $args) |
||
157 | { |
||
158 | return array( |
||
159 | 'class' => $class, |
||
160 | 'args' => array($args), |
||
161 | 'queue' => $queue, |
||
162 | ); |
||
163 | } |
||
164 | |||
165 | /** |
||
166 | * If there are no jobs for a given key/timestamp, delete references to it. |
||
167 | * |
||
168 | * Used internally to remove empty delayed: items in Redis when there are |
||
169 | * no more jobs left to run at that timestamp. |
||
170 | * |
||
171 | * @param string $key Key to count number of items at. |
||
172 | * @param int $timestamp Matching timestamp for $key. |
||
173 | */ |
||
174 | private static function cleanupTimestamp($key, $timestamp) |
||
175 | { |
||
176 | $timestamp = self::getTimestamp($timestamp); |
||
177 | $redis = Resque::redis(); |
||
178 | |||
179 | if ($redis->llen($key) == 0) { |
||
180 | $redis->del($key); |
||
181 | $redis->zrem('delayed_queue_schedule', $timestamp); |
||
182 | } |
||
183 | } |
||
184 | |||
185 | /** |
||
186 | * Convert a timestamp in some format in to a unix timestamp as an integer. |
||
187 | * |
||
188 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||
189 | * @return int Timestamp |
||
190 | * @throws ResqueScheduler_InvalidTimestampException |
||
191 | */ |
||
192 | private static function getTimestamp($timestamp) |
||
193 | { |
||
194 | if ($timestamp instanceof DateTime) { |
||
195 | $timestamp = $timestamp->getTimestamp(); |
||
196 | } |
||
197 | |||
198 | if ((int)$timestamp != $timestamp) { |
||
199 | throw new ResqueScheduler_InvalidTimestampException( |
||
200 | 'The supplied timestamp value could not be converted to an integer.' |
||
201 | ); |
||
202 | } |
||
203 | |||
204 | return (int)$timestamp; |
||
205 | } |
||
206 | |||
207 | /** |
||
208 | * Find the first timestamp in the delayed schedule before/including the timestamp. |
||
209 | * |
||
210 | * Will find and return the first timestamp upto and including the given |
||
211 | * timestamp. This is the heart of the ResqueScheduler that will make sure |
||
212 | * that any jobs scheduled for the past when the worker wasn't running are |
||
213 | * also queued up. |
||
214 | * |
||
215 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||
216 | * Defaults to now. |
||
217 | * @return int|false UNIX timestamp, or false if nothing to run. |
||
218 | */ |
||
219 | public static function nextDelayedTimestamp($at = null) |
||
220 | { |
||
221 | if ($at === null) { |
||
222 | $at = time(); |
||
223 | } else { |
||
224 | $at = self::getTimestamp($at); |
||
225 | } |
||
226 | |||
227 | $items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1))); |
||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||
228 | if (!empty($items)) { |
||
229 | return $items[0]; |
||
230 | } |
||
231 | |||
232 | return false; |
||
233 | } |
||
234 | |||
235 | /** |
||
236 | * Pop a job off the delayed queue for a given timestamp. |
||
237 | * |
||
238 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||
239 | * @return array Matching job at timestamp. |
||
240 | */ |
||
241 | public static function nextItemForTimestamp($timestamp) |
||
242 | { |
||
243 | $timestamp = self::getTimestamp($timestamp); |
||
244 | $key = 'delayed:' . $timestamp; |
||
245 | |||
246 | $item = json_decode(Resque::redis()->lpop($key), true); |
||
247 | |||
248 | self::cleanupTimestamp($key, $timestamp); |
||
249 | return $item; |
||
250 | } |
||
251 | |||
252 | /** |
||
253 | * Ensure that supplied job class/queue is valid. |
||
254 | * |
||
255 | * @param string $class Name of job class. |
||
256 | * @param string $queue Name of queue. |
||
257 | * @throws Resque_Exception |
||
258 | */ |
||
259 | private static function validateJob($class, $queue) |
||
260 | { |
||
261 | if (empty($class)) { |
||
262 | throw new Resque_Exception('Jobs must be given a class.'); |
||
263 | } elseif (empty($queue)) { |
||
264 | throw new Resque_Exception('Jobs must be put in a queue.'); |
||
265 | } |
||
266 | |||
267 | return true; |
||
268 | } |
||
269 | } |
||
270 |