1 | <?php |
||||||
2 | |||||||
3 | /** |
||||||
4 | * ResqueScheduler core class to handle scheduling of jobs in the future. |
||||||
5 | * |
||||||
6 | * @package ResqueScheduler |
||||||
7 | * @author Chris Boulton <[email protected]> |
||||||
8 | * @copyright (c) 2012 Chris Boulton |
||||||
9 | * @license http://www.opensource.org/licenses/mit-license.php |
||||||
10 | */ |
||||||
11 | class ResqueScheduler |
||||||
12 | { |
||||||
13 | const VERSION = "0.1"; |
||||||
14 | |||||||
15 | /** |
||||||
16 | * Enqueue a job in a given number of seconds from now. |
||||||
17 | * |
||||||
18 | * Identical to Resque::enqueue, however the first argument is the number |
||||||
19 | * of seconds before the job should be executed. |
||||||
20 | * |
||||||
21 | * @param int $in Number of seconds from now when the job should be executed. |
||||||
22 | * @param string $queue The name of the queue to place the job in. |
||||||
23 | * @param string $class The name of the class that contains the code to execute the job. |
||||||
24 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||||||
25 | */ |
||||||
26 | public static function enqueueIn($in, $queue, $class, array $args = array()) |
||||||
27 | { |
||||||
28 | self::enqueueAt(time() + $in, $queue, $class, $args); |
||||||
29 | } |
||||||
30 | |||||||
31 | /** |
||||||
32 | * Enqueue a job for execution at a given timestamp. |
||||||
33 | * |
||||||
34 | * Identical to Resque::enqueue, however the first argument is a timestamp |
||||||
35 | * (either UNIX timestamp in integer format or an instance of the DateTime |
||||||
36 | * class in PHP). |
||||||
37 | * |
||||||
38 | * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp. |
||||||
39 | * @param string $queue The name of the queue to place the job in. |
||||||
40 | * @param string $class The name of the class that contains the code to execute the job. |
||||||
41 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||||||
42 | */ |
||||||
43 | public static function enqueueAt($at, $queue, $class, $args = array()) |
||||||
44 | { |
||||||
45 | self::validateJob($class, $queue); |
||||||
46 | |||||||
47 | $job = self::jobToHash($queue, $class, $args); |
||||||
48 | self::delayedPush($at, $job); |
||||||
49 | |||||||
50 | Resque_Event::trigger('afterSchedule', array( |
||||||
51 | 'at' => $at, |
||||||
52 | 'queue' => $queue, |
||||||
53 | 'class' => $class, |
||||||
54 | 'args' => $args, |
||||||
55 | )); |
||||||
56 | } |
||||||
57 | |||||||
58 | /** |
||||||
59 | * Directly append an item to the delayed queue schedule. |
||||||
60 | * |
||||||
61 | * @param DateTime|int $timestamp Timestamp job is scheduled to be run at. |
||||||
62 | * @param array $item Hash of item to be pushed to schedule. |
||||||
63 | */ |
||||||
64 | public static function delayedPush($timestamp, $item) |
||||||
65 | { |
||||||
66 | $timestamp = self::getTimestamp($timestamp); |
||||||
67 | $redis = Resque::redis(); |
||||||
68 | $redis->rpush('delayed:' . $timestamp, json_encode($item)); |
||||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||||
69 | |||||||
70 | $redis->zadd('delayed_queue_schedule', $timestamp, $timestamp); |
||||||
0 ignored issues
–
show
The method
zadd() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
71 | } |
||||||
72 | |||||||
73 | /** |
||||||
74 | * Get the total number of jobs in the delayed schedule. |
||||||
75 | * |
||||||
76 | * @return int Number of scheduled jobs. |
||||||
77 | */ |
||||||
78 | public static function getDelayedQueueScheduleSize() |
||||||
79 | { |
||||||
80 | return (int)Resque::redis()->zcard('delayed_queue_schedule'); |
||||||
0 ignored issues
–
show
The method
zcard() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
81 | } |
||||||
82 | |||||||
83 | /** |
||||||
84 | * Get the number of jobs for a given timestamp in the delayed schedule. |
||||||
85 | * |
||||||
86 | * @param DateTime|int $timestamp Timestamp |
||||||
87 | * @return int Number of scheduled jobs. |
||||||
88 | */ |
||||||
89 | public static function getDelayedTimestampSize($timestamp) |
||||||
90 | { |
||||||
91 | $timestamp = self::getTimestamp($timestamp); |
||||||
92 | return Resque::redis()->llen('delayed:' . $timestamp, $timestamp); |
||||||
0 ignored issues
–
show
The method
llen() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
93 | } |
||||||
94 | |||||||
95 | /** |
||||||
96 | * Remove a delayed job from the queue |
||||||
97 | * |
||||||
98 | * note: you must specify exactly the same |
||||||
99 | * queue, class and arguments that you used when you added |
||||||
100 | * to the delayed queue |
||||||
101 | * |
||||||
102 | * also, this is an expensive operation because all delayed keys have tobe |
||||||
103 | * searched |
||||||
104 | * |
||||||
105 | * @param $queue |
||||||
106 | * @param $class |
||||||
107 | * @param $args |
||||||
108 | * @return int number of jobs that were removed |
||||||
109 | */ |
||||||
110 | public static function removeDelayed($queue, $class, $args) |
||||||
111 | { |
||||||
112 | $destroyed = 0; |
||||||
113 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||||||
114 | $redis = Resque::redis(); |
||||||
115 | |||||||
116 | foreach ($redis->keys('delayed:*') as $key) { |
||||||
0 ignored issues
–
show
The method
keys() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
117 | $key = $redis->removePrefix($key); |
||||||
118 | $destroyed += $redis->lrem($key, 0, $item); |
||||||
0 ignored issues
–
show
The method
lrem() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
119 | } |
||||||
120 | |||||||
121 | return $destroyed; |
||||||
122 | } |
||||||
123 | |||||||
124 | /** |
||||||
125 | * removed a delayed job queued for a specific timestamp |
||||||
126 | * |
||||||
127 | * note: you must specify exactly the same |
||||||
128 | * queue, class and arguments that you used when you added |
||||||
129 | * to the delayed queue |
||||||
130 | * |
||||||
131 | * @param $timestamp |
||||||
132 | * @param $queue |
||||||
133 | * @param $class |
||||||
134 | * @param $args |
||||||
135 | * @return mixed |
||||||
136 | */ |
||||||
137 | public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args) |
||||||
138 | { |
||||||
139 | $key = 'delayed:' . self::getTimestamp($timestamp); |
||||||
140 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||||||
141 | $redis = Resque::redis(); |
||||||
142 | $count = $redis->lrem($key, 0, $item); |
||||||
143 | self::cleanupTimestamp($key, $timestamp); |
||||||
144 | |||||||
145 | return $count; |
||||||
146 | } |
||||||
147 | |||||||
148 | /** |
||||||
149 | * Generate hash of all job properties to be saved in the scheduled queue. |
||||||
150 | * |
||||||
151 | * @param string $queue Name of the queue the job will be placed on. |
||||||
152 | * @param string $class Name of the job class. |
||||||
153 | * @param array $args Array of job arguments. |
||||||
154 | */ |
||||||
155 | |||||||
156 | private static function jobToHash($queue, $class, $args) |
||||||
157 | { |
||||||
158 | return array( |
||||||
159 | 'class' => $class, |
||||||
160 | 'args' => array($args), |
||||||
161 | 'queue' => $queue, |
||||||
162 | ); |
||||||
163 | } |
||||||
164 | |||||||
165 | /** |
||||||
166 | * If there are no jobs for a given key/timestamp, delete references to it. |
||||||
167 | * |
||||||
168 | * Used internally to remove empty delayed: items in Redis when there are |
||||||
169 | * no more jobs left to run at that timestamp. |
||||||
170 | * |
||||||
171 | * @param string $key Key to count number of items at. |
||||||
172 | * @param int $timestamp Matching timestamp for $key. |
||||||
173 | */ |
||||||
174 | private static function cleanupTimestamp($key, $timestamp) |
||||||
175 | { |
||||||
176 | $timestamp = self::getTimestamp($timestamp); |
||||||
177 | $redis = Resque::redis(); |
||||||
178 | |||||||
179 | if ($redis->llen($key) == 0) { |
||||||
180 | $redis->del($key); |
||||||
0 ignored issues
–
show
The method
del() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
181 | $redis->zrem('delayed_queue_schedule', $timestamp); |
||||||
0 ignored issues
–
show
The method
zrem() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
182 | } |
||||||
183 | } |
||||||
184 | |||||||
185 | /** |
||||||
186 | * Convert a timestamp in some format in to a unix timestamp as an integer. |
||||||
187 | * |
||||||
188 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||||
189 | * @return int Timestamp |
||||||
190 | * @throws ResqueScheduler_InvalidTimestampException |
||||||
191 | */ |
||||||
192 | private static function getTimestamp($timestamp) |
||||||
193 | { |
||||||
194 | if ($timestamp instanceof DateTime) { |
||||||
195 | $timestamp = $timestamp->getTimestamp(); |
||||||
196 | } |
||||||
197 | |||||||
198 | if ((int)$timestamp != $timestamp) { |
||||||
199 | throw new ResqueScheduler_InvalidTimestampException( |
||||||
200 | 'The supplied timestamp value could not be converted to an integer.' |
||||||
201 | ); |
||||||
202 | } |
||||||
203 | |||||||
204 | return (int)$timestamp; |
||||||
205 | } |
||||||
206 | |||||||
207 | /** |
||||||
208 | * Find the first timestamp in the delayed schedule before/including the timestamp. |
||||||
209 | * |
||||||
210 | * Will find and return the first timestamp upto and including the given |
||||||
211 | * timestamp. This is the heart of the ResqueScheduler that will make sure |
||||||
212 | * that any jobs scheduled for the past when the worker wasn't running are |
||||||
213 | * also queued up. |
||||||
214 | * |
||||||
215 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||||
216 | * Defaults to now. |
||||||
217 | * @return int|false UNIX timestamp, or false if nothing to run. |
||||||
218 | */ |
||||||
219 | public static function nextDelayedTimestamp($at = null) |
||||||
220 | { |
||||||
221 | if ($at === null) { |
||||||
222 | $at = time(); |
||||||
223 | } else { |
||||||
224 | $at = self::getTimestamp($at); |
||||||
225 | } |
||||||
226 | |||||||
227 | $items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1))); |
||||||
0 ignored issues
–
show
The method
zrangebyscore() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
228 | if (!empty($items)) { |
||||||
229 | return $items[0]; |
||||||
230 | } |
||||||
231 | |||||||
232 | return false; |
||||||
233 | } |
||||||
234 | |||||||
235 | /** |
||||||
236 | * Pop a job off the delayed queue for a given timestamp. |
||||||
237 | * |
||||||
238 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||||
239 | * @return array Matching job at timestamp. |
||||||
240 | */ |
||||||
241 | public static function nextItemForTimestamp($timestamp) |
||||||
242 | { |
||||||
243 | $timestamp = self::getTimestamp($timestamp); |
||||||
244 | $key = 'delayed:' . $timestamp; |
||||||
245 | |||||||
246 | $item = json_decode(Resque::redis()->lpop($key), true); |
||||||
0 ignored issues
–
show
The method
lpop() does not exist on Resque_Redis . Since you implemented __call , consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
247 | |||||||
248 | self::cleanupTimestamp($key, $timestamp); |
||||||
249 | return $item; |
||||||
250 | } |
||||||
251 | |||||||
252 | /** |
||||||
253 | * Ensure that supplied job class/queue is valid. |
||||||
254 | * |
||||||
255 | * @param string $class Name of job class. |
||||||
256 | * @param string $queue Name of queue. |
||||||
257 | * @throws Resque_Exception |
||||||
258 | */ |
||||||
259 | private static function validateJob($class, $queue) |
||||||
260 | { |
||||||
261 | if (empty($class)) { |
||||||
262 | throw new Resque_Exception('Jobs must be given a class.'); |
||||||
263 | } elseif (empty($queue)) { |
||||||
264 | throw new Resque_Exception('Jobs must be put in a queue.'); |
||||||
265 | } |
||||||
266 | |||||||
267 | return true; |
||||||
268 | } |
||||||
269 | } |
||||||
270 |