1 | <?php |
||||
2 | |||||
3 | /** |
||||
4 | * ResqueScheduler core class to handle scheduling of jobs in the future. |
||||
5 | * |
||||
6 | * @package ResqueScheduler |
||||
7 | * @author Chris Boulton <[email protected]> |
||||
8 | * @copyright (c) 2012 Chris Boulton |
||||
9 | * @license http://www.opensource.org/licenses/mit-license.php |
||||
10 | */ |
||||
11 | class ResqueScheduler |
||||
12 | { |
||||
13 | const VERSION = "0.1"; |
||||
14 | |||||
15 | /** |
||||
16 | * Enqueue a job in a given number of seconds from now. |
||||
17 | * |
||||
18 | * Identical to Resque::enqueue, however the first argument is the number |
||||
19 | * of seconds before the job should be executed. |
||||
20 | * |
||||
21 | * @param int $in Number of seconds from now when the job should be executed. |
||||
22 | * @param string $queue The name of the queue to place the job in. |
||||
23 | * @param string $class The name of the class that contains the code to execute the job. |
||||
24 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||||
25 | */ |
||||
26 | public static function enqueueIn($in, $queue, $class, array $args = array()) |
||||
27 | { |
||||
28 | self::enqueueAt(time() + $in, $queue, $class, $args); |
||||
29 | } |
||||
30 | |||||
31 | /** |
||||
32 | * Enqueue a job for execution at a given timestamp. |
||||
33 | * |
||||
34 | * Identical to Resque::enqueue, however the first argument is a timestamp |
||||
35 | * (either UNIX timestamp in integer format or an instance of the DateTime |
||||
36 | * class in PHP). |
||||
37 | * |
||||
38 | * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp. |
||||
39 | * @param string $queue The name of the queue to place the job in. |
||||
40 | * @param string $class The name of the class that contains the code to execute the job. |
||||
41 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||||
42 | */ |
||||
43 | public static function enqueueAt($at, $queue, $class, $args = array()) |
||||
44 | { |
||||
45 | self::validateJob($class, $queue); |
||||
46 | |||||
47 | $job = self::jobToHash($queue, $class, $args); |
||||
48 | self::delayedPush($at, $job); |
||||
49 | |||||
50 | Resque_Event::trigger('afterSchedule', array( |
||||
51 | 'at' => $at, |
||||
52 | 'queue' => $queue, |
||||
53 | 'class' => $class, |
||||
54 | 'args' => $args, |
||||
55 | )); |
||||
56 | } |
||||
57 | |||||
58 | /** |
||||
59 | * Directly append an item to the delayed queue schedule. |
||||
60 | * |
||||
61 | * @param DateTime|int $timestamp Timestamp job is scheduled to be run at. |
||||
62 | * @param array $item Hash of item to be pushed to schedule. |
||||
63 | */ |
||||
64 | public static function delayedPush($timestamp, $item) |
||||
65 | { |
||||
66 | $timestamp = self::getTimestamp($timestamp); |
||||
67 | $redis = Resque::redis(); |
||||
68 | $redis->rpush('delayed:' . $timestamp, json_encode($item)); |
||||
69 | |||||
70 | $redis->zadd('delayed_queue_schedule', $timestamp, $timestamp); |
||||
71 | } |
||||
72 | |||||
73 | /** |
||||
74 | * Get the total number of jobs in the delayed schedule. |
||||
75 | * |
||||
76 | * @return int Number of scheduled jobs. |
||||
77 | */ |
||||
78 | public static function getDelayedQueueScheduleSize() |
||||
79 | { |
||||
80 | return (int)Resque::redis()->zcard('delayed_queue_schedule'); |
||||
81 | } |
||||
82 | |||||
83 | /** |
||||
84 | * Get the number of jobs for a given timestamp in the delayed schedule. |
||||
85 | * |
||||
86 | * @param DateTime|int $timestamp Timestamp |
||||
87 | * @return int Number of scheduled jobs. |
||||
88 | */ |
||||
89 | public static function getDelayedTimestampSize($timestamp) |
||||
90 | { |
||||
91 | $timestamp = self::getTimestamp($timestamp); |
||||
92 | return Resque::redis()->llen('delayed:' . $timestamp, $timestamp); |
||||
93 | } |
||||
94 | |||||
95 | /** |
||||
96 | * Remove a delayed job from the queue |
||||
97 | * |
||||
98 | * note: you must specify exactly the same |
||||
99 | * queue, class and arguments that you used when you added |
||||
100 | * to the delayed queue |
||||
101 | * |
||||
102 | * also, this is an expensive operation because all delayed keys have tobe |
||||
103 | * searched |
||||
104 | * |
||||
105 | * @param $queue |
||||
106 | * @param $class |
||||
107 | * @param $args |
||||
108 | * @return int number of jobs that were removed |
||||
109 | */ |
||||
110 | public static function removeDelayed($queue, $class, $args) |
||||
111 | { |
||||
112 | $destroyed = 0; |
||||
113 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||||
114 | $redis = Resque::redis(); |
||||
115 | |||||
116 | foreach ($redis->keys('delayed:*') as $key) { |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
117 | $key = $redis->removePrefix($key); |
||||
118 | $destroyed += $redis->lrem($key, 0, $item); |
||||
0 ignored issues
–
show
The method
lrem() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
119 | } |
||||
120 | |||||
121 | return $destroyed; |
||||
122 | } |
||||
123 | |||||
124 | /** |
||||
125 | * removed a delayed job queued for a specific timestamp |
||||
126 | * |
||||
127 | * note: you must specify exactly the same |
||||
128 | * queue, class and arguments that you used when you added |
||||
129 | * to the delayed queue |
||||
130 | * |
||||
131 | * @param $timestamp |
||||
132 | * @param $queue |
||||
133 | * @param $class |
||||
134 | * @param $args |
||||
135 | * @return mixed |
||||
136 | */ |
||||
137 | public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args) |
||||
138 | { |
||||
139 | $key = 'delayed:' . self::getTimestamp($timestamp); |
||||
140 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||||
141 | $redis = Resque::redis(); |
||||
142 | $count = $redis->lrem($key, 0, $item); |
||||
143 | self::cleanupTimestamp($key, $timestamp); |
||||
144 | |||||
145 | return $count; |
||||
146 | } |
||||
147 | |||||
148 | /** |
||||
149 | * Generate hash of all job properties to be saved in the scheduled queue. |
||||
150 | * |
||||
151 | * @param string $queue Name of the queue the job will be placed on. |
||||
152 | * @param string $class Name of the job class. |
||||
153 | * @param array $args Array of job arguments. |
||||
154 | */ |
||||
155 | |||||
156 | private static function jobToHash($queue, $class, $args) |
||||
157 | { |
||||
158 | return array( |
||||
159 | 'class' => $class, |
||||
160 | 'args' => array($args), |
||||
161 | 'queue' => $queue, |
||||
162 | ); |
||||
163 | } |
||||
164 | |||||
165 | /** |
||||
166 | * If there are no jobs for a given key/timestamp, delete references to it. |
||||
167 | * |
||||
168 | * Used internally to remove empty delayed: items in Redis when there are |
||||
169 | * no more jobs left to run at that timestamp. |
||||
170 | * |
||||
171 | * @param string $key Key to count number of items at. |
||||
172 | * @param int $timestamp Matching timestamp for $key. |
||||
173 | */ |
||||
174 | private static function cleanupTimestamp($key, $timestamp) |
||||
175 | { |
||||
176 | $timestamp = self::getTimestamp($timestamp); |
||||
177 | $redis = Resque::redis(); |
||||
178 | |||||
179 | if ($redis->llen($key) == 0) { |
||||
180 | $redis->del($key); |
||||
181 | $redis->zrem('delayed_queue_schedule', $timestamp); |
||||
182 | } |
||||
183 | } |
||||
184 | |||||
185 | /** |
||||
186 | * Convert a timestamp in some format in to a unix timestamp as an integer. |
||||
187 | * |
||||
188 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||
189 | * @return int Timestamp |
||||
190 | * @throws ResqueScheduler_InvalidTimestampException |
||||
191 | */ |
||||
192 | private static function getTimestamp($timestamp) |
||||
193 | { |
||||
194 | if ($timestamp instanceof DateTime) { |
||||
195 | $timestamp = $timestamp->getTimestamp(); |
||||
196 | } |
||||
197 | |||||
198 | if ((int)$timestamp != $timestamp) { |
||||
199 | throw new ResqueScheduler_InvalidTimestampException( |
||||
200 | 'The supplied timestamp value could not be converted to an integer.' |
||||
201 | ); |
||||
202 | } |
||||
203 | |||||
204 | return (int)$timestamp; |
||||
205 | } |
||||
206 | |||||
207 | /** |
||||
208 | * Find the first timestamp in the delayed schedule before/including the timestamp. |
||||
209 | * |
||||
210 | * Will find and return the first timestamp upto and including the given |
||||
211 | * timestamp. This is the heart of the ResqueScheduler that will make sure |
||||
212 | * that any jobs scheduled for the past when the worker wasn't running are |
||||
213 | * also queued up. |
||||
214 | * |
||||
215 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||
216 | * Defaults to now. |
||||
217 | * @return int|false UNIX timestamp, or false if nothing to run. |
||||
218 | */ |
||||
219 | public static function nextDelayedTimestamp($at = null) |
||||
220 | { |
||||
221 | if ($at === null) { |
||||
222 | $at = time(); |
||||
223 | } else { |
||||
224 | $at = self::getTimestamp($at); |
||||
225 | } |
||||
226 | |||||
227 | $items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1))); |
||||
228 | if (!empty($items)) { |
||||
229 | return $items[0]; |
||||
230 | } |
||||
231 | |||||
232 | return false; |
||||
233 | } |
||||
234 | |||||
235 | /** |
||||
236 | * Pop a job off the delayed queue for a given timestamp. |
||||
237 | * |
||||
238 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||
239 | * @return array Matching job at timestamp. |
||||
240 | */ |
||||
241 | public static function nextItemForTimestamp($timestamp) |
||||
242 | { |
||||
243 | $timestamp = self::getTimestamp($timestamp); |
||||
244 | $key = 'delayed:' . $timestamp; |
||||
245 | |||||
246 | $item = json_decode(Resque::redis()->lpop($key), true); |
||||
247 | |||||
248 | self::cleanupTimestamp($key, $timestamp); |
||||
249 | return $item; |
||||
250 | } |
||||
251 | |||||
252 | /** |
||||
253 | * Ensure that supplied job class/queue is valid. |
||||
254 | * |
||||
255 | * @param string $class Name of job class. |
||||
256 | * @param string $queue Name of queue. |
||||
257 | * @throws Resque_Exception |
||||
258 | */ |
||||
259 | private static function validateJob($class, $queue) |
||||
260 | { |
||||
261 | if (empty($class)) { |
||||
262 | throw new Resque_Exception('Jobs must be given a class.'); |
||||
263 | } elseif (empty($queue)) { |
||||
264 | throw new Resque_Exception('Jobs must be put in a queue.'); |
||||
265 | } |
||||
266 | |||||
267 | return true; |
||||
268 | } |
||||
269 | } |
||||
270 |