resque /
php-resque
| 1 | <?php |
||||
| 2 | |||||
| 3 | /** |
||||
| 4 | * ResqueScheduler core class to handle scheduling of jobs in the future. |
||||
| 5 | * |
||||
| 6 | * @package ResqueScheduler |
||||
| 7 | * @author Chris Boulton <[email protected]> |
||||
| 8 | * @copyright (c) 2012 Chris Boulton |
||||
| 9 | * @license http://www.opensource.org/licenses/mit-license.php |
||||
| 10 | */ |
||||
| 11 | class ResqueScheduler |
||||
| 12 | { |
||||
| 13 | const VERSION = "0.1"; |
||||
| 14 | |||||
| 15 | /** |
||||
| 16 | * Enqueue a job in a given number of seconds from now. |
||||
| 17 | * |
||||
| 18 | * Identical to Resque::enqueue, however the first argument is the number |
||||
| 19 | * of seconds before the job should be executed. |
||||
| 20 | * |
||||
| 21 | * @param int $in Number of seconds from now when the job should be executed. |
||||
| 22 | * @param string $queue The name of the queue to place the job in. |
||||
| 23 | * @param string $class The name of the class that contains the code to execute the job. |
||||
| 24 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||||
| 25 | */ |
||||
| 26 | public static function enqueueIn($in, $queue, $class, array $args = array()) |
||||
| 27 | { |
||||
| 28 | self::enqueueAt(time() + $in, $queue, $class, $args); |
||||
| 29 | } |
||||
| 30 | |||||
| 31 | /** |
||||
| 32 | * Enqueue a job for execution at a given timestamp. |
||||
| 33 | * |
||||
| 34 | * Identical to Resque::enqueue, however the first argument is a timestamp |
||||
| 35 | * (either UNIX timestamp in integer format or an instance of the DateTime |
||||
| 36 | * class in PHP). |
||||
| 37 | * |
||||
| 38 | * @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp. |
||||
| 39 | * @param string $queue The name of the queue to place the job in. |
||||
| 40 | * @param string $class The name of the class that contains the code to execute the job. |
||||
| 41 | * @param array $args Any optional arguments that should be passed when the job is executed. |
||||
| 42 | */ |
||||
| 43 | public static function enqueueAt($at, $queue, $class, $args = array()) |
||||
| 44 | { |
||||
| 45 | self::validateJob($class, $queue); |
||||
| 46 | |||||
| 47 | $job = self::jobToHash($queue, $class, $args); |
||||
| 48 | self::delayedPush($at, $job); |
||||
| 49 | |||||
| 50 | Resque_Event::trigger('afterSchedule', array( |
||||
| 51 | 'at' => $at, |
||||
| 52 | 'queue' => $queue, |
||||
| 53 | 'class' => $class, |
||||
| 54 | 'args' => $args, |
||||
| 55 | )); |
||||
| 56 | } |
||||
| 57 | |||||
| 58 | /** |
||||
| 59 | * Directly append an item to the delayed queue schedule. |
||||
| 60 | * |
||||
| 61 | * @param DateTime|int $timestamp Timestamp job is scheduled to be run at. |
||||
| 62 | * @param array $item Hash of item to be pushed to schedule. |
||||
| 63 | */ |
||||
| 64 | public static function delayedPush($timestamp, $item) |
||||
| 65 | { |
||||
| 66 | $timestamp = self::getTimestamp($timestamp); |
||||
| 67 | $redis = Resque::redis(); |
||||
| 68 | $redis->rpush('delayed:' . $timestamp, json_encode($item)); |
||||
| 69 | |||||
| 70 | $redis->zadd('delayed_queue_schedule', $timestamp, $timestamp); |
||||
| 71 | } |
||||
| 72 | |||||
| 73 | /** |
||||
| 74 | * Get the total number of jobs in the delayed schedule. |
||||
| 75 | * |
||||
| 76 | * @return int Number of scheduled jobs. |
||||
| 77 | */ |
||||
| 78 | public static function getDelayedQueueScheduleSize() |
||||
| 79 | { |
||||
| 80 | return (int)Resque::redis()->zcard('delayed_queue_schedule'); |
||||
| 81 | } |
||||
| 82 | |||||
| 83 | /** |
||||
| 84 | * Get the number of jobs for a given timestamp in the delayed schedule. |
||||
| 85 | * |
||||
| 86 | * @param DateTime|int $timestamp Timestamp |
||||
| 87 | * @return int Number of scheduled jobs. |
||||
| 88 | */ |
||||
| 89 | public static function getDelayedTimestampSize($timestamp) |
||||
| 90 | { |
||||
| 91 | $timestamp = self::getTimestamp($timestamp); |
||||
| 92 | return Resque::redis()->llen('delayed:' . $timestamp, $timestamp); |
||||
| 93 | } |
||||
| 94 | |||||
| 95 | /** |
||||
| 96 | * Remove a delayed job from the queue |
||||
| 97 | * |
||||
| 98 | * note: you must specify exactly the same |
||||
| 99 | * queue, class and arguments that you used when you added |
||||
| 100 | * to the delayed queue |
||||
| 101 | * |
||||
| 102 | * also, this is an expensive operation because all delayed keys have tobe |
||||
| 103 | * searched |
||||
| 104 | * |
||||
| 105 | * @param $queue |
||||
| 106 | * @param $class |
||||
| 107 | * @param $args |
||||
| 108 | * @return int number of jobs that were removed |
||||
| 109 | */ |
||||
| 110 | public static function removeDelayed($queue, $class, $args) |
||||
| 111 | { |
||||
| 112 | $destroyed = 0; |
||||
| 113 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||||
| 114 | $redis = Resque::redis(); |
||||
| 115 | |||||
| 116 | foreach ($redis->keys('delayed:*') as $key) { |
||||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
| 117 | $key = $redis->removePrefix($key); |
||||
| 118 | $destroyed += $redis->lrem($key, 0, $item); |
||||
|
0 ignored issues
–
show
The method
lrem() does not exist on Resque_Redis. Since you implemented __call, consider adding a @method annotation.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 119 | } |
||||
| 120 | |||||
| 121 | return $destroyed; |
||||
| 122 | } |
||||
| 123 | |||||
| 124 | /** |
||||
| 125 | * removed a delayed job queued for a specific timestamp |
||||
| 126 | * |
||||
| 127 | * note: you must specify exactly the same |
||||
| 128 | * queue, class and arguments that you used when you added |
||||
| 129 | * to the delayed queue |
||||
| 130 | * |
||||
| 131 | * @param $timestamp |
||||
| 132 | * @param $queue |
||||
| 133 | * @param $class |
||||
| 134 | * @param $args |
||||
| 135 | * @return mixed |
||||
| 136 | */ |
||||
| 137 | public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args) |
||||
| 138 | { |
||||
| 139 | $key = 'delayed:' . self::getTimestamp($timestamp); |
||||
| 140 | $item = json_encode(self::jobToHash($queue, $class, $args)); |
||||
| 141 | $redis = Resque::redis(); |
||||
| 142 | $count = $redis->lrem($key, 0, $item); |
||||
| 143 | self::cleanupTimestamp($key, $timestamp); |
||||
| 144 | |||||
| 145 | return $count; |
||||
| 146 | } |
||||
| 147 | |||||
| 148 | /** |
||||
| 149 | * Generate hash of all job properties to be saved in the scheduled queue. |
||||
| 150 | * |
||||
| 151 | * @param string $queue Name of the queue the job will be placed on. |
||||
| 152 | * @param string $class Name of the job class. |
||||
| 153 | * @param array $args Array of job arguments. |
||||
| 154 | */ |
||||
| 155 | |||||
| 156 | private static function jobToHash($queue, $class, $args) |
||||
| 157 | { |
||||
| 158 | return array( |
||||
| 159 | 'class' => $class, |
||||
| 160 | 'args' => array($args), |
||||
| 161 | 'queue' => $queue, |
||||
| 162 | ); |
||||
| 163 | } |
||||
| 164 | |||||
| 165 | /** |
||||
| 166 | * If there are no jobs for a given key/timestamp, delete references to it. |
||||
| 167 | * |
||||
| 168 | * Used internally to remove empty delayed: items in Redis when there are |
||||
| 169 | * no more jobs left to run at that timestamp. |
||||
| 170 | * |
||||
| 171 | * @param string $key Key to count number of items at. |
||||
| 172 | * @param int $timestamp Matching timestamp for $key. |
||||
| 173 | */ |
||||
| 174 | private static function cleanupTimestamp($key, $timestamp) |
||||
| 175 | { |
||||
| 176 | $timestamp = self::getTimestamp($timestamp); |
||||
| 177 | $redis = Resque::redis(); |
||||
| 178 | |||||
| 179 | if ($redis->llen($key) == 0) { |
||||
| 180 | $redis->del($key); |
||||
| 181 | $redis->zrem('delayed_queue_schedule', $timestamp); |
||||
| 182 | } |
||||
| 183 | } |
||||
| 184 | |||||
| 185 | /** |
||||
| 186 | * Convert a timestamp in some format in to a unix timestamp as an integer. |
||||
| 187 | * |
||||
| 188 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||
| 189 | * @return int Timestamp |
||||
| 190 | * @throws ResqueScheduler_InvalidTimestampException |
||||
| 191 | */ |
||||
| 192 | private static function getTimestamp($timestamp) |
||||
| 193 | { |
||||
| 194 | if ($timestamp instanceof DateTime) { |
||||
| 195 | $timestamp = $timestamp->getTimestamp(); |
||||
| 196 | } |
||||
| 197 | |||||
| 198 | if ((int)$timestamp != $timestamp) { |
||||
| 199 | throw new ResqueScheduler_InvalidTimestampException( |
||||
| 200 | 'The supplied timestamp value could not be converted to an integer.' |
||||
| 201 | ); |
||||
| 202 | } |
||||
| 203 | |||||
| 204 | return (int)$timestamp; |
||||
| 205 | } |
||||
| 206 | |||||
| 207 | /** |
||||
| 208 | * Find the first timestamp in the delayed schedule before/including the timestamp. |
||||
| 209 | * |
||||
| 210 | * Will find and return the first timestamp upto and including the given |
||||
| 211 | * timestamp. This is the heart of the ResqueScheduler that will make sure |
||||
| 212 | * that any jobs scheduled for the past when the worker wasn't running are |
||||
| 213 | * also queued up. |
||||
| 214 | * |
||||
| 215 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||
| 216 | * Defaults to now. |
||||
| 217 | * @return int|false UNIX timestamp, or false if nothing to run. |
||||
| 218 | */ |
||||
| 219 | public static function nextDelayedTimestamp($at = null) |
||||
| 220 | { |
||||
| 221 | if ($at === null) { |
||||
| 222 | $at = time(); |
||||
| 223 | } else { |
||||
| 224 | $at = self::getTimestamp($at); |
||||
| 225 | } |
||||
| 226 | |||||
| 227 | $items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, array('limit' => array(0, 1))); |
||||
| 228 | if (!empty($items)) { |
||||
| 229 | return $items[0]; |
||||
| 230 | } |
||||
| 231 | |||||
| 232 | return false; |
||||
| 233 | } |
||||
| 234 | |||||
| 235 | /** |
||||
| 236 | * Pop a job off the delayed queue for a given timestamp. |
||||
| 237 | * |
||||
| 238 | * @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp. |
||||
| 239 | * @return array Matching job at timestamp. |
||||
| 240 | */ |
||||
| 241 | public static function nextItemForTimestamp($timestamp) |
||||
| 242 | { |
||||
| 243 | $timestamp = self::getTimestamp($timestamp); |
||||
| 244 | $key = 'delayed:' . $timestamp; |
||||
| 245 | |||||
| 246 | $item = json_decode(Resque::redis()->lpop($key), true); |
||||
| 247 | |||||
| 248 | self::cleanupTimestamp($key, $timestamp); |
||||
| 249 | return $item; |
||||
| 250 | } |
||||
| 251 | |||||
| 252 | /** |
||||
| 253 | * Ensure that supplied job class/queue is valid. |
||||
| 254 | * |
||||
| 255 | * @param string $class Name of job class. |
||||
| 256 | * @param string $queue Name of queue. |
||||
| 257 | * @throws Resque_Exception |
||||
| 258 | */ |
||||
| 259 | private static function validateJob($class, $queue) |
||||
| 260 | { |
||||
| 261 | if (empty($class)) { |
||||
| 262 | throw new Resque_Exception('Jobs must be given a class.'); |
||||
| 263 | } elseif (empty($queue)) { |
||||
| 264 | throw new Resque_Exception('Jobs must be put in a queue.'); |
||||
| 265 | } |
||||
| 266 | |||||
| 267 | return true; |
||||
| 268 | } |
||||
| 269 | } |
||||
| 270 |